暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Starrocks从DataX 导入

原创 飞鸟-柯 2023-02-27
1781

本文介绍如何利用 DataX 基于 StarRocks 开发的 StarRocks Writer 插件将 MySQL、Oracle 等数据库中的数据导入至 StarRocks。该插件将数据转化为 CSV 或 JSON 格式并将其通过 Stream Load 方式批量导入至 StarRocks。

DataX 导入支持多种数据源,您可以参考 DataX - Support Data Channels 了解详情。

前提条件

使用 DataX 导入数据前,请确保已安装以下依赖:

安装 StarRocks Writer

下载 StarRocks Writer 安装包,然后运行如下命令将安装包解压至 datax/plugin/writer 路径下。

tar -xzvf starrockswriter.tar.gz

您也可以编译 StarRocks Writer 源码

创建配置文件

为导入作业创建 JSON 格式配置文件。

以下示例模拟 MySQL 至 StarRocks 导入作业的配置文件。您可以根据实际使用场景修改相应参数。

{ "job": { "setting": { "speed": { "channel": 1 }, "errorLimit": { "record": 0, "percentage": 0 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "xxxx", "password": "xxxx", "column": [ "k1", "k2", "v1", "v2" ], "connection": [ { "table": [ "table1", "table2" ], "jdbcUrl": [ "jdbc:mysql://x.x.x.x:3306/datax_test1" ] }, { "table": [ "table3", "table4" ], "jdbcUrl": [ "jdbc:mysql://x.x.x.x:3306/datax_test2" ] } ] } }, "writer": { "name": "starrockswriter", "parameter": { "username": "xxxx", "password": "xxxx", "database": "xxxx", "table": "xxxx", "column": ["k1", "k2", "v1", "v2"], "preSql": [], "postSql": [], "jdbcUrl": "jdbc:mysql://y.y.y.y:9030/", "loadUrl": ["y.y.y.y:8030", "y.y.y.y:8030"], "loadProps": {} } } } ] } }

您需要在 writer 部分配置以下参数:

参数说明必选默认值
usernameStarRocks 集群用户名。
passwordStarRocks 集群用户密码。
databaseStarRocks 目标数据库名称。
tableStarRocks 目标表名称。
loadUrlStarRocks FE 节点的地址,格式为 "fe_ip:fe_http_port"。多个地址使用逗号(,)分隔。
column目标表需要写入数据的列,多列使用逗号(,)分隔。例如: "column": ["id","name","age"]。该参数的对应关系与列名无关,但与其顺序一一对应。建议与 reader 中的 column 一样。
preSql标准 SQL 语句,在数据写入到目标表执行。
postSql标准 SQL 语句,在数据写入到目标表执行。
jdbcUrl目标数据库的 JDBC 连接信息,用于执行 preSql 及 postSql
maxBatchRows单次 Stream Load 导入的最大行数。导入大量数据时,StarRocks Writer 将根据 maxBatchRows 或 maxBatchSize 将数据分为多个 Stream Load 作业分批导入。500000
maxBatchSize单次 Stream Load 导入的最大字节数,单位为 Byte。导入大量数据时,StarRocks Writer 将根据 maxBatchRows 或 maxBatchSize 将数据分为多个 Stream Load 作业分批导入。104857600
flushInterval上一次 Stream Load 结束至下一次开始的时间间隔,单位为 ms。300000
loadPropsStream Load 的导入参数,详情参考 STREAM LOAD

注意

  • column 参数不能留空。如果您希望导入所有列,可以配置该项为 "*"
  • writer 部分中 column 的数量必须与 reader 部分一致。

配置特殊参数

  • 设置分隔符

默认设置下,数据会被转化为字符串,以 CSV 格式通过 Stream Load 导入至 StarRocks。字符串以 \t 作为列分隔符,\n 作为行分隔符。

您可以通过在参数 loadProps 中添加以下配置,以更改分隔符:

"loadProps": { "column_separator": "\\x01", "row_delimiter": "\\x02" }

其中,\x 代表 16 进制,额外的 \ 做转义 x 用。

说明

StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符。

  • 设置导入格式

当前导入方式的默认导入格式为 CSV。您可以通过在参数 loadProps 中添加以下配置更改数据格式为 JSON:

"loadProps": { "format": "json", "strip_outer_array": true }

  • 导入衍生列

如果您希望在向 StarRocks 表中导入原始数据的同时导入衍生列,则需要在 loadProps 里额外设置 columns 参数。

以下代码片段展示导入原始数据列 ab 以及衍生列 c

"writer": { "column": ["a", "b"], "loadProps": { "columns": "a,b,c=murmur_hash3_32(a)" } ... }

其中,您需要在 column 项中填写原始数据中的列名,并在 columns 项中额外填写衍生列名和其对应的数据处理方式。

启动导入任务

完成配置文件设定后,您需要通过命令行启动导入任务。

以下示例基于前述步骤中的配置文件 job.json 启动导入任务,并设置了 JVM 调优参数(--jvm="-Xms6G -Xmx6G")以及日志等级(--loglevel=debug)。您可以根据实际使用场景修改相应参数。

python datax/bin/datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug datax/job/job.json

如果源数据库与目标数据库时区不同,您需要命令行中添加 -Duser.timezone=GMTxxx 选项设置源数据库的时区信息。

例如,源库使用 UTC 时区,则启动任务时需添加参数 -Duser.timezone=GMT+0

查看导入任务状态

由于 DataX 导入是基于封装的 Stream Load 实现,您可以在 datax/log/$date/ 目录下搜索对应的导入作业日志,日志文件名字中包含导入使用的 JSON 文件名和任务启动时间(小时、分钟、秒),例如:t_datax_job_job_json-20_52_19.196.log

  • 如果日志中有 "http://fe_ip:fe_http_port/api/db_name/tbl_name/_stream_load" 记录生成,则表示 Stream Load 作业已成功触发。作业成功触发后,您可参考 Stream Load 返回值 查看任务情况。
  • 日志中如果没有上述信息,请根据报错信息排查问题,或者在 DataX 社区问题中寻找解决方案。

注意

使用 DataX 导入时,当存在不符合目标表格式的数据,若 StarRocks 中该字段允许 NULL 值,不合法的数据将转为 NULL 后正常导入。若字段不允许 NULL 值,且任务中未配置容错率(max_filter_ratio),则任务报错,导入作业中断。

取消或停止导入任务

您可以通过终止 DataX 的 Python 进程停止导入任务。

pkill datax

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论