暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

0022.S StarRocks本地文件导入(Stream Load)

rundba 2022-05-23
3260

StarRocks本地文件导入(Stream Load)




本文通过Stream Load方式将本地文件导入到StarRocks进行展示。



1. StarRocks数据导入




   

        用户创建表之后, 导入数据填充表。


  • 支持导入数据源有: 本地文件, HDFS, Kafka和S3.

  • 支持导入方式有: 批量导入, 流式导入, 实时导入.

  • 支持的数据格式有: CSV, Parquet, ORC等.

  • 导入发起方式有: 用RESTful接口, 执行SQL命令.



2. StarRocks导入方式




      为适配不同的数据导入需求,StarRocks系统提供了5种不同的导入方式,以支持不同的数据源(如HDFS、Kafka、本地文件等),或者按不同的方式(异步或同步)导入数据。


  • Broker Load

      Broker Load 通过 Broker 进程访问并读取外部数据源,然后采用 MySQL 协议向 StarRocks 创建导入作业。

        Broker Load适用于源数据在Broker进程可访问的存储系统(如HDFS)中,数据量为几十GB到上百GB。数据源有Hive等。


  • Spark Load

      Spark Load 通过外部的 Spark 资源实现对导入数据的预处理,提高 StarRocks 大数据量的导入性能并且节省 StarRocks集群的计算资源。

      Spark Load适用于初次迁移大数据量(可到TB级别)到StarRocks的场景,且源数据在Spark可访问存储系统(如HDFS)中。


  • Stream Load

      Stream Load是一种同步执行的导入方式。用户通过 HTTP 协议发送请求将本地文件或数据流导入到 StarRocks中,并等待系统返回导入的结果状态,从而判断导入是否成功。

      Stream Load适用于导入本地文件,或通过程序导入数据流中的数据。数据源有Flink、CSV等。


  • Routine Load

      Routine Load(例行导入)提供了一种自动从指定数据源进行数据导入的功能。用户通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(如 Kafka)中读取数据并导入到 StarRocks 中。


  • Insert Into

      类似 MySQL 中的 Insert 语句,StarRocks 提供 INSERT INTO tbl SELECT ...; 的方式从 StarRocks 的表中读取数据并导入到另一张表。或者通过 INSERT INTO tbl VALUES(...); 插入单条数据。数据源有DataX/DTS、Kettle/Informatic、StarRocks本身。

      本次采用流式方式将操作系统本地文件导入到StarRocks进行模拟演示。



3. 创建导入测试表





1)单分区

      建立一个名字为 table1 的逻辑表。分桶列为 siteid,桶数为 10。


这个表的 schema 如下:


  • siteid:类型是INT(4字节), 默认值为10

  • citycode:类型是SMALLINT(2字节)

  • username:类型是VARCHAR, 最大长度为32, 默认值为空字符串

  • pv:类型是BIGINT(8字节), 默认值是0; 这是一个指标列, StarRocks内部会对指标列做聚合操作, 这个列的聚合方法是求和(SUM)


建表语句如下:


    CREATE TABLE table1
    (
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
    )
    AGGREGATE KEY(siteid, citycode, username)
    DISTRIBUTED BY HASH(siteid) BUCKETS 10
    PROPERTIES("replication_num" = "1");


    2) 复合分区

          建立一个名字为 table2 的逻辑表。


    这个表的 schema 如下:


    • event_day:类型是DATE,无默认值

    • siteid:类型是INT(4字节), 默认值为10

    • citycode:类型是SMALLINT(2字节)

    • username:类型是VARCHAR, 最大长度为32, 默认值为空字符串

    • pv:类型是BIGINT(8字节), 默认值是0; 这是一个指标列, StarRocks内部会对指标列做聚合操作, 这个列的聚合方法是求和(SUM)

    • 我们使用 event_day 列作为分区列,建立3个分区: p201706, p201707, p201708

                 p201706:范围为 [最小值, 2017-07-01)

                 p201707:范围为 [2017-07-01, 2017-08-01)

                 p201708:范围为 [2017-08-01, 2017-09-01)

                 注意区间为左闭右开。


    每个分区使用 siteid 进行哈希分桶,桶数为10


    建表语句如下:


      CREATE TABLE table2
      (
      event_day DATE,
      siteid INT DEFAULT '10',
      citycode SMALLINT,
      username VARCHAR(32) DEFAULT '',
      pv BIGINT SUM DEFAULT '0'
      )
      AGGREGATE KEY(event_day, siteid, citycode, username)
      PARTITION BY RANGE(event_day)
      (
      PARTITION p201706 VALUES LESS THAN ('2017-07-01'),
      PARTITION p201707 VALUES LESS THAN ('2017-08-01'),
      PARTITION p201708 VALUES LESS THAN ('2017-09-01')
      )
      DISTRIBUTED BY HASH(siteid) BUCKETS 10
      PROPERTIES("replication_num" = "1");


      3) 确认表信息

            example_db中表已经存在

        mysql> show tables;
        +----------------------+
        | Tables_in_example_db |
        +----------------------+
        | table1 |
        | table2 |
        +----------------------+
        2 rows in set (0.00 sec)


        4. 流式导入





               流式导入通过 HTTP 协议向 StarRocks 传输数据,可以不依赖其他系统或组件直接导入本地数据。详细语法帮助可以参阅 HELP STREAM LOAD。


        示例1:以 "table1_20170707" 为 Label,使用本地文件 table1_data 导入 table1 表。

          curl --location-trusted -u test:StarRocks -H "label:table1_20170707" -H "column_separator:," -T table1_data http://FE_HOST:8030/api/example_db/table1/_stream_load

          如:

            curl --location-trusted -u test:StarRocks -H "label:table1_20170707" -H "column_separator:," -T table1_data http://192.168.80.32:8030/api/example_db/table1/_stream_load


                   FE_HOST 是任一 FE 所在节点 IP,8030 为 fe.conf 中的 http_port。

            可以使用任一 BE 的 IP,以及 be.conf 中的 webserver_port 进行导入。如:BE_HOST:8040。


                   本地文件table1_data以","作为数据之间的分隔,具体内容如下:


              cat >> table1_data << RUNDBA
              1,1,jim,2
              2,1,grace,2
              3,2,tom,2
              4,3,bush,3
              5,3,helen,3
              RUNDBA


              table1导入后数据

                mysql> select * from table1;
                +--------+----------+----------+------+
                | siteid | citycode | username | pv |
                +--------+----------+----------+------+
                | 3 | 2 | tom | 2 |
                | 4 | 3 | bush | 3 |
                | 1 | 1 | jim | 2 |
                | 5 | 3 | helen | 3 |
                | 2 | 1 | grace | 2 |
                +--------+----------+----------+------+
                5 rows in set (0.01 sec)


                示例2: 以 "table2_20170707" 为 Label,使用本地文件 table2_data 导入 table2 表。

                  curl --location-trusted -u test:test -H "label:table2_20170707" -H "column_separator:|" -T table2_data http://127.0.0.1:8030/api/example_db/table2/_stream_load


                  如:

                    curl --location-trusted -u test:StarRocks -H "label:table2_20170707" -H "column_separator:|" -T table2_data http://127.0.0.1:8030/api/example_db/table2/_stream_load


                    本地文件 table2_data 以 | 作为数据之间的分隔,具体内容如下:


                      cat >> table2_data << KUHN
                      2017-07-03|1|1|jim|2
                      2017-07-05|2|1|grace|2
                      2017-07-12|3|2|tom|2
                      2017-07-15|4|3|bush|3
                      2017-07-12|5|3|helen|3
                      KUHN


                      table2导入后数据:

                        mysql> select * from table2;
                        +------------+--------+----------+----------+------+
                        | event_day | siteid | citycode | username | pv |
                        +------------+--------+----------+----------+------+
                        | 2017-07-03 | 1 | 1 | jim | 2 |
                        | 2017-07-12 | 5 | 3 | helen | 3 |
                        | 2017-07-05 | 2 | 1 | grace | 2 |
                        | 2017-07-12 | 3 | 2 | tom | 2 |
                        | 2017-07-15 | 4 | 3 | bush | 3 |
                        +------------+--------+----------+----------+------+
                        5 rows in set (0.01 sec)



                        5. 小结





                               本文通过StarRocks数据导入方式进行了介绍,演示了Stream Load方式对数据进行导入,当然数据源有Flink、CSV等。实际中结合自己环境进行匹配导入。


                        —END—


                         


                        文章转载自rundba,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论