暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

InfluxDB-python客户端初步适用心得

灵感来了一半的灵感 2021-08-09
3352

0、前言

    这两周总有些奇奇怪怪的事情发生,原定上周就应该结束的InfluxDB-python客户端使用被多次打断,上周最后还发生了令人发指的数据丢失现象,这周刚解决完就被更多的奇奇怪怪的事情给绕晕了头。好在写数据部分的代码调试好了;取数据的代码在python中就是一组字符串,跟MySQL的python查询类似,具体的flux语言可以根据具体需求查询官网文档;可视化部分再说吧。

    那开始今天的内容,首先会简单介绍一下InfluxDB的客户端,以python客户端为例;然后会提供一个官网案例进行解析;最后会根据我在实践中的经验提出几个问题和解决方法。

一、InfluxDB-python客户端

    这里主要参考的网址为:

https://docs.influxdata.com/influxdb/v2.0/api-guide/client-libraries/#

https://github.com/influxdata/influxdb-client-python

    InfluxDB的客户端有多种语言选项,这里因为python更熟一点,所以使用了python客户端。

    为了避免时间拖长以后没有热情把核心内容呈现出来,这里对于基础部分只做简单描述。

    首先是基础的库安装和加载:

    pip install 'influxdb-client[ciso]'
      import influxdb_client

          然后是使用简介。客户端大致可以分为客户端配置,写入API、查询API等所需API。其中,客户端配置可以通过直接赋值的方式,也可以通过配置文件的方式:

        self.client = InfluxDBClient.from_config_file("config.ini")

            配置文件内容:

          [influx2]
          url=http://localhost:8086
          org=my-org
          token=my-token
          timeout=6000
          verify_ssl=False

              这里根据需求,可以通过配置文件配置其他变量,我就是这么做的,为了避免使用者直接在代码中输入变量。python中的配置文件使用方法可以参考:

            from configparser import ConfigParser
            #……
            conf = ConfigParser()
            conf.read("config.ini")
            bucket_= conf.get('influx2','bucket')
            #……

                需要注意的是,从配置文件中get到的是字符串,数值可以考虑使用类型转换成数值,列表之类的可以考虑使用eval()。

                关于写入,需要注意一下write_options,分为异步(ASYNCHRONOUS )、同步(SYNCHRONOUS)、批处理(write_options=WriteOptions (……))三种方式。根据大神解释,在并行处理中,异步表现为一个进程中的处理流程结束,就会马上抢用计算资源;同步则表现为一批处理流程都要完成以后才会开启下一轮计算资源配置使用;批处理的话是由自己指定写入格式,具体参数如下:

            PropertyDescriptionDefault Value
            batch_sizethe number  of data pointx to collect in a batch1000
            flush_intervalthe number  of milliseconds before the batch is written1000
            jitter_intervalthe number  of milliseconds to increase the batch flush interval by a random amount0
            retry_intervalthe number  of milliseconds to retry first unsuccessful write. The next retry delay is  computed using exponential random backoff. The retry interval is used when  the InfluxDB server does not specify "Retry-After" header.5000
            max_retry_timemaximum  total retry timeout in milliseconds.180_000
            max_retriesthe number  of max retries when write fails5
            max_retry_delaythe maximum  delay between each retry attempt in milliseconds125_000
            exponential_basethe base  for the exponential retry delay, the next delay is computed using random  exponential backoff as a random value within the intervalretry_interval  * exponential_base^(attempts-1) and retry_interval * exponential_base^(attempts). Example for retry_interval=5_000,  exponential_base=2, max_retry_delay=125_000, total=5 Retry  delays are random distributed values within the ranges of [5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000,  80_000-125_000]2

                查询方式有多种,有一个例子中全部包含了,根据我自身的需求,我使用了DataFrame的方式:

              query_api.query_data_frame(……)

              二、一个并行计算的例子

                  这里有个官网的并行写入例子分享一下:

                """
                Import public NYC taxi and for-hire vehicle (Uber, Lyft, etc.) trip data into InfluxDB 2.0


                https://github.com/toddwschneider/nyc-taxi-data
                """
                import concurrent.futures
                import io
                import multiprocessing
                from collections import OrderedDict
                from csv import DictReader
                from datetime import datetime
                from multiprocessing import Value
                from urllib.request import urlopen


                import rx
                from rx import operators as ops


                from influxdb_client import Point, InfluxDBClient, WriteOptions
                from influxdb_client.client.write_api import WriteType




                class ProgressTextIOWrapper(io.TextIOWrapper):
                """
                TextIOWrapper that store progress of read.
                """
                def __init__(self, *args, **kwargs):
                io.TextIOWrapper.__init__(self, *args, **kwargs)
                self.progress = None
                pass


                def readline(self, *args, **kwarg) -> str:
                readline = super().readline(*args, **kwarg)
                self.progress.value += len(readline)
                return readline




                class InfluxDBWriter(multiprocessing.Process):
                """
                Writer that writes data in batches with 50_000 items.
                """
                def __init__(self, queue):
                multiprocessing.Process.__init__(self)
                self.queue = queue
                self.client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False)
                self.write_api = self.client.write_api(
                write_options=WriteOptions(write_type=WriteType.batching, batch_size=50_000, flush_interval=10_000))


                def run(self):
                while True:
                next_task = self.queue.get()
                if next_task is None:
                # Poison pill means terminate
                self.terminate()
                self.queue.task_done()
                break
                self.write_api.write(bucket="my-bucket", record=next_task)
                self.queue.task_done()


                def terminate(self) -> None:
                proc_name = self.name
                print()
                print('Writer: flushing data...')
                self.write_api.close()
                self.client.close()
                print('Writer: closed'.format(proc_name))




                def parse_row(row: OrderedDict):
                """Parse row of CSV file into Point with structure:


                taxi-trip-data,DOLocationID=152,PULocationID=79,dispatching_base_num=B02510 dropoff_datetime="2019-01-01 01:27:24" 1546304267000000000


                CSV format:
                dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
                B00001,2019-01-01 00:30:00,2019-01-01 02:51:55,,,
                B00001,2019-01-01 00:45:00,2019-01-01 00:54:49,,,
                B00001,2019-01-01 00:15:00,2019-01-01 00:54:52,,,
                B00008,2019-01-01 00:19:00,2019-01-01 00:39:00,,,
                B00008,2019-01-01 00:27:00,2019-01-01 00:37:00,,,
                B00008,2019-01-01 00:48:00,2019-01-01 01:02:00,,,
                B00008,2019-01-01 00:50:00,2019-01-01 00:59:00,,,
                B00008,2019-01-01 00:51:00,2019-01-01 00:56:00,,,
                B00009,2019-01-01 00:44:00,2019-01-01 00:58:00,,,
                B00009,2019-01-01 00:19:00,2019-01-01 00:36:00,,,
                B00009,2019-01-01 00:36:00,2019-01-01 00:49:00,,,
                B00009,2019-01-01 00:26:00,2019-01-01 00:32:00,,,
                ...


                :param row: the row of CSV file
                :return: Parsed csv row to [Point]
                """


                return Point("taxi-trip-data") \
                .tag("dispatching_base_num", row['dispatching_base_num']) \
                .tag("PULocationID", row['PULocationID']) \
                .tag("DOLocationID", row['DOLocationID']) \
                .tag("SR_Flag", row['SR_Flag']) \
                .field("dropoff_datetime", row['dropoff_datetime']) \
                .time(row['pickup_datetime']) \
                .to_line_protocol()




                def parse_rows(rows, total_size):
                """
                Parse bunch of CSV rows into LineProtocol


                :param total_size: Total size of file
                :param rows: CSV rows
                :return: List of line protocols
                """
                _parsed_rows = list(map(parse_row, rows))


                counter_.value += len(_parsed_rows)
                if counter_.value % 10_000 == 0:
                print('{0:8}{1}'.format(counter_.value, ' - {0:.2f} %'
                .format(100 * float(progress_.value) float(int(total_size))) if total_size else ""))
                pass


                queue_.put(_parsed_rows)
                return None




                def init_counter(counter, progress, queue):
                """
                Initialize shared counter for display progress
                """
                global counter_
                counter_ = counter
                global progress_
                progress_ = progress
                global queue_
                queue_ = queue




                if __name__ == "__main__":
                """
                Create multiprocess shared environment
                """
                queue_ = multiprocessing.Manager().Queue()
                counter_ = Value('i', 0)
                progress_ = Value('i', 0)
                startTime = datetime.now()


                url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv"
                # url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv"


                """
                Open URL and for stream data
                """
                response = urlopen(url)
                if response.headers:
                content_length = response.headers['Content-length']
                io_wrapper = ProgressTextIOWrapper(response)
                io_wrapper.progress = progress_


                """
                Start writer as a new process
                """
                writer = InfluxDBWriter(queue_)
                writer.start()


                """
                Create process pool for parallel encoding into LineProtocol
                """
                cpu_count = multiprocessing.cpu_count()
                with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter,
                initargs=(counter_, progress_, queue_)) as executor:
                """
                Converts incoming HTTP stream into sequence of LineProtocol
                """
                data = rx \
                .from_iterable(DictReader(io_wrapper)) \
                .pipe(ops.buffer_with_count(10_000),
                # Parse 10_000 rows into LineProtocol on subprocess
                ops.flat_map(lambda rows: executor.submit(parse_rows, rows, content_length)))


                """
                Write data into InfluxDB
                """
                data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}'))


                """
                Terminate Writer
                """
                queue_.put(None)
                queue_.join()


                print()
                print(f'Import finished in: {datetime.now() - startTime}')
                print()


                """
                Querying 10 pickups from dispatching 'B00008'
                """
                query = 'from(bucket:"my-bucket")' \
                '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
                '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
                '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
                '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
                '|> rename(columns: {_time: "pickup_datetime"})' \
                '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'


                client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False)
                result = client.query_api().query(query=query)


                """
                Processing results
                """
                print()
                print("=== Querying 10 pickups from dispatching 'B00008' ===")
                print()
                for table in result:
                for record in table.records:
                print(
                f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}')


                """
                Close client
                """
                client.close()


                    分解一下的话,大致有以下内容:

                    当然了,参考这个例子可以根据自己的需求进行改造。

                三、一些问题与处理方法

                1. 并行处理很慢,调试过程中发现部分代码重复调用
                      写主程序代码前需要注意加上:

                    if __name__ == "__main__":
                  • 写入环节耗时严重,I/O并非制约写入效率的环节,写入API才是

                               a.将写入API放进并行程序中,具体的可以放在类似于案例中的parse_rows函数定义当中;

                                b.为了解决DataFrame自身的资源释放问题,可以考虑使用python的原始类:

                      @dataclass(repr=False,eq=False,order=False)
                      class DataFrameContainer:
                      df:pd.DataFrame=None

                                 c.客户端和写入API的配置可以放在并行的初始化函数当中,并用Finalize()函数关闭客户端和写入API:

                        def init_writer(que):
                            global f_queue
                        f_queue=que
                        global client
                        client = InfluxDBClient.from_config_file(ini_file,debug=False,enable_gzip=True)
                        global write_api
                        write_api = client.write_api(write_options=SYNCHRONOUS)
                        print(f"writer started:{os.getpid()}")
                        Finalize(write_api,write_api.close)
                        Finalize(client,client.close)
                      • 写入过程中数据丢失问题

                                  测试过转py文件运行、调timeout参数、将异步写入调为同步写入、在batching方式下调参、查看读取的文件等方式。

                                  最终发现,问题主要出现在read_csv()为DataFrame时,部分列格式不一致导致数据丢失,而写入方式为异步时不会提示该错误,所以将DataFrame所有的非时间列都转换为了float格式,字符串列drop掉。这样,解决了大部分的问题。不过还是有部分数据丢失,当写入方式为同步而非异步时,对于测试的数据,这部分问题解决,数据不再丢失。

                      四、尾声

                          按照原计划,查询和可视化工作也需要一起撰写,但由于这部分工作没有完成,而且查询语法可以在官网找到,可视化也可以通过python进行,所以这一篇主讲了并行写入的内容,毕竟,这一部分是最耗时的。

                          下周的内容可能与InfluxDB-python客户端的查询、可视化有关,也可能复盘一下以前的内容。

                          同时,根据最近一时手痒的操作,8月的专业复盘工作可能会比较紧张,尽量完成。

                      文章转载自灵感来了一半的灵感,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论