StarRocks本地文件导入(Stream Load)
本文通过Stream Load方式将本地文件导入到StarRocks进行展示。

1. StarRocks数据导入
用户创建表之后, 导入数据填充表。
支持导入数据源有: 本地文件, HDFS, Kafka和S3.
支持导入方式有: 批量导入, 流式导入, 实时导入.
支持的数据格式有: CSV, Parquet, ORC等.
导入发起方式有: 用RESTful接口, 执行SQL命令.
2. StarRocks导入方式
为适配不同的数据导入需求,StarRocks系统提供了5种不同的导入方式,以支持不同的数据源(如HDFS、Kafka、本地文件等),或者按不同的方式(异步或同步)导入数据。
Broker Load
Broker Load 通过 Broker 进程访问并读取外部数据源,然后采用 MySQL 协议向 StarRocks 创建导入作业。
Broker Load适用于源数据在Broker进程可访问的存储系统(如HDFS)中,数据量为几十GB到上百GB。数据源有Hive等。
Spark Load
Spark Load 通过外部的 Spark 资源实现对导入数据的预处理,提高 StarRocks 大数据量的导入性能并且节省 StarRocks集群的计算资源。
Spark Load适用于初次迁移大数据量(可到TB级别)到StarRocks的场景,且源数据在Spark可访问存储系统(如HDFS)中。
Stream Load
Stream Load是一种同步执行的导入方式。用户通过 HTTP 协议发送请求将本地文件或数据流导入到 StarRocks中,并等待系统返回导入的结果状态,从而判断导入是否成功。
Stream Load适用于导入本地文件,或通过程序导入数据流中的数据。数据源有Flink、CSV等。
Routine Load
Routine Load(例行导入)提供了一种自动从指定数据源进行数据导入的功能。用户通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(如 Kafka)中读取数据并导入到 StarRocks 中。
Insert Into
类似 MySQL 中的 Insert 语句,StarRocks 提供 INSERT INTO tbl SELECT ...; 的方式从 StarRocks 的表中读取数据并导入到另一张表。或者通过 INSERT INTO tbl VALUES(...); 插入单条数据。数据源有DataX/DTS、Kettle/Informatic、StarRocks本身。
本次采用流式方式将操作系统本地文件导入到StarRocks进行模拟演示。
3. 创建导入测试表
1)单分区
建立一个名字为 table1 的逻辑表。分桶列为 siteid,桶数为 10。
这个表的 schema 如下:
siteid:类型是INT(4字节), 默认值为10
citycode:类型是SMALLINT(2字节)
username:类型是VARCHAR, 最大长度为32, 默认值为空字符串
pv:类型是BIGINT(8字节), 默认值是0; 这是一个指标列, StarRocks内部会对指标列做聚合操作, 这个列的聚合方法是求和(SUM)
建表语句如下:
CREATE TABLE table1(siteid INT DEFAULT '10',citycode SMALLINT,username VARCHAR(32) DEFAULT '',pv BIGINT SUM DEFAULT '0')AGGREGATE KEY(siteid, citycode, username)DISTRIBUTED BY HASH(siteid) BUCKETS 10PROPERTIES("replication_num" = "1");
2) 复合分区
建立一个名字为 table2 的逻辑表。
这个表的 schema 如下:
event_day:类型是DATE,无默认值
siteid:类型是INT(4字节), 默认值为10
citycode:类型是SMALLINT(2字节)
username:类型是VARCHAR, 最大长度为32, 默认值为空字符串
pv:类型是BIGINT(8字节), 默认值是0; 这是一个指标列, StarRocks内部会对指标列做聚合操作, 这个列的聚合方法是求和(SUM)
我们使用 event_day 列作为分区列,建立3个分区: p201706, p201707, p201708
p201706:范围为 [最小值, 2017-07-01)
p201707:范围为 [2017-07-01, 2017-08-01)
p201708:范围为 [2017-08-01, 2017-09-01)
注意区间为左闭右开。
每个分区使用 siteid 进行哈希分桶,桶数为10
建表语句如下:
CREATE TABLE table2(event_day DATE,siteid INT DEFAULT '10',citycode SMALLINT,username VARCHAR(32) DEFAULT '',pv BIGINT SUM DEFAULT '0')AGGREGATE KEY(event_day, siteid, citycode, username)PARTITION BY RANGE(event_day)(PARTITION p201706 VALUES LESS THAN ('2017-07-01'),PARTITION p201707 VALUES LESS THAN ('2017-08-01'),PARTITION p201708 VALUES LESS THAN ('2017-09-01'))DISTRIBUTED BY HASH(siteid) BUCKETS 10PROPERTIES("replication_num" = "1");
3) 确认表信息
example_db中表已经存在
mysql> show tables;+----------------------+| Tables_in_example_db |+----------------------+| table1 || table2 |+----------------------+2 rows in set (0.00 sec)
4. 流式导入
流式导入通过 HTTP 协议向 StarRocks 传输数据,可以不依赖其他系统或组件直接导入本地数据。详细语法帮助可以参阅 HELP STREAM LOAD。
示例1:以 "table1_20170707" 为 Label,使用本地文件 table1_data 导入 table1 表。
curl --location-trusted -u test:StarRocks -H "label:table1_20170707" -H "column_separator:," -T table1_data http://FE_HOST:8030/api/example_db/table1/_stream_load
如:
curl --location-trusted -u test:StarRocks -H "label:table1_20170707" -H "column_separator:," -T table1_data http://192.168.80.32:8030/api/example_db/table1/_stream_load
FE_HOST 是任一 FE 所在节点 IP,8030 为 fe.conf 中的 http_port。
可以使用任一 BE 的 IP,以及 be.conf 中的 webserver_port 进行导入。如:BE_HOST:8040。
本地文件table1_data以","作为数据之间的分隔,具体内容如下:
cat >> table1_data << RUNDBA1,1,jim,22,1,grace,23,2,tom,24,3,bush,35,3,helen,3RUNDBA
table1导入后数据
mysql> select * from table1;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+| 3 | 2 | tom | 2 || 4 | 3 | bush | 3 || 1 | 1 | jim | 2 || 5 | 3 | helen | 3 || 2 | 1 | grace | 2 |+--------+----------+----------+------+5 rows in set (0.01 sec)
示例2: 以 "table2_20170707" 为 Label,使用本地文件 table2_data 导入 table2 表。
curl --location-trusted -u test:test -H "label:table2_20170707" -H "column_separator:|" -T table2_data http://127.0.0.1:8030/api/example_db/table2/_stream_load
如:
curl --location-trusted -u test:StarRocks -H "label:table2_20170707" -H "column_separator:|" -T table2_data http://127.0.0.1:8030/api/example_db/table2/_stream_load
本地文件 table2_data 以 | 作为数据之间的分隔,具体内容如下:
cat >> table2_data << KUHN2017-07-03|1|1|jim|22017-07-05|2|1|grace|22017-07-12|3|2|tom|22017-07-15|4|3|bush|32017-07-12|5|3|helen|3KUHN
table2导入后数据:
mysql> select * from table2;+------------+--------+----------+----------+------+| event_day | siteid | citycode | username | pv |+------------+--------+----------+----------+------+| 2017-07-03 | 1 | 1 | jim | 2 || 2017-07-12 | 5 | 3 | helen | 3 || 2017-07-05 | 2 | 1 | grace | 2 || 2017-07-12 | 3 | 2 | tom | 2 || 2017-07-15 | 4 | 3 | bush | 3 |+------------+--------+----------+----------+------+5 rows in set (0.01 sec)
5. 小结
本文通过StarRocks数据导入方式进行了介绍,演示了Stream Load方式对数据进行导入,当然数据源有Flink、CSV等。实际中结合自己环境进行匹配导入。
—END—





