暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何使用 DataX 加载 CSV 数据文件到 OceanBase

SQL学习者 2023-07-22
296

DataX 简介

DataX 是阿里云 DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 、OceanBase 等各种异构数据源之间高效的数据同步功能。

开源地址:https://github.com/alibaba/datax 。

OceanBase 企业版客户,可以跟 OceanBase 的技术人员索取 DataX 内部版本(RPM 包)。 OceanBase 社区版客户,可以在 DataX 开源网站上下载源码,自行编译。 编译的时候,注意在 pom.xml 中剔除掉不用的数据库插件。否则,编译出来的包非常大。

DATAX 配置文件

DataX 以任务的形式迁移数据,每个任务只处理一个表,每个任务有一个 json 格式的配置文件。配置文件里会包含 reader 和 writer 两节。具体的 reader 和 writer 都是 DataX 支持的数据库插件,可以随意搭配使用(就跟孩子搭积木一样)

最新版本的 DataX 还提供了一个 WEB 管理界面。

下面是配置文件示例。

{

"job": {

"content": [

{

"reader": {

"name": "streamreader",

"parameter": {

"sliceRecordCount": 10,

"column": [

{

"type": "long",

"value": "10"

},

{

"type": "string",

"value": "hello,你好,世界-DataX"

}

]

}

},

"writer": {

"name": "streamwriter",

"parameter": {

"encoding": "UTF-8",

"print": true

}

}

}

],

"setting": {

"speed": {

"channel": 2

}

}

}

}

将 json 配置文件放到 datax的目录的 job 下,或者自定义路径。执行方法:

$bin/datax.py job/stream2stream.json

输出信息:

<.....>

2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!

2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%

2021-08-26 11:06:09.223 [job-0] INFO JobContainer -

任务启动时刻 : 2021-08-26 11:05:59

任务结束时刻 : 2021-08-26 11:06:09

任务总计耗时 : 10s

任务平均流量 : 38B/s

记录写入速度 : 2rec/s

读出记录总数 : 20

读写失败总数 : 0

DataX 任务执行结束都会有个简单的任务报告,关注一下里面平均流量、写入速度和读写失败总数等。

DataX 的 job 的参数 settings 可以指定速度参数、错误记录容忍度等。

"setting": {"speed": {"channel": 10

},"errorLimit": {"record": 10,"percentage": 0.1}}

特别说明:

  • speed 还有个限速的设计(bytes),但是有 bug,大家就不要用了。 errorLimit 表示报错记录数的容忍度,超出这个限制后任务就中断退出。
  • channel 是并发数,理论上并发越大,迁移性能越好。实际也要考虑源端的读压力、网络传输性能以及目标端写入性能。

下面是常用数据源(mysql、oracle 、csv 和 oceanbase )的读写插件。

txtfilereader 提供了读取本地文件系统数据存储的能力。在底层实现上,txtfilereader 获取本地文件数据,并转换为 DataX 传输协议传递给 Writer。

本地文件内容存放的是一张逻辑意义上的二维表,例如 CSV 格式的文本信息。

txtfilereader 有一些功能限制和参数,请首先阅读官方说明: https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md 。

下面是 txtfilereader 的 示例。

"reader": {

"name": "txtfilereader",

"parameter": {

"path": ["/tmp/tpcc/bmsql_oorder"],

"fileName": "bmsql_oorder",

"encoding": "UTF-8",

"column": ["*"],

"dateFormat": "yyyy-MM-dd hh:mm:ss" ,

"nullFormat": "\\N" ,

"fieldDelimiter": ","

}

}

特别说明:

  • path 指定到路径即可,fileName 会是生成的文件前缀,完整的文件名很长,有随机字符串(避免重复)。文件的数量可能是根据记录数来自动分的。
  • column 可以指定为 "*" ,这样所有字段值都作为字符串了。这个虽然方便,但不能保证完全没有问题。目前测试常用数据类型是可以的。
  • nullFormat 指定空值的记录,默认是"null",这个读入oracle的时候会有问题。建议导出文件的时候指定为 "\N" 表示空值和。
  • fieldDelimiter 指定 csv 文件的列分隔符,这个跟导出的时候指定的列分隔符保持一致。通常导出的列内容如果含有列分隔符时,会用双引号进行包含(enclosed)。用逗号(,)也可以,只是太过常见,建议用生僻一点的单字符。如 | 或 ^ 等。

mysqlreader 插件说明:

MysqlReader 插件实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的sql语句将数据从mysql库中SELECT出来。

不同于其他关系型数据库,MysqlReader 不支持 FetchSize.

实现原理方面,简而言之,MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。详细功能和参数说明请首先阅读官方说明:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 。

下面是 mysqlreader 的配置示例。

"reader": {

"name": "mysqlreader",

"parameter": {

"username": "tpcc",

"password": "********",

"column": [

"*"

],

"connection": [

{

"table": [

"bmsql_oorder"

],

"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8"]

}

]

}

}

特别说明:

  • 如果表的主键是单列主键,比如说 id。那么可以在 parameter 下加一个配置: "splitPk": "db_id",
  • 。如果是加在最后,就去掉后面的逗号(,)。
  • column 指定读取的列。通常建议写具体的列,可以在列上用函数做逻辑转换。用 * 就是要时刻确保列跟目标端写入的列能对应上。

oceanbasev10writer 插件实现了写入数据到 OceanBase 主库的目的表的功能。在底层实现上, OceanbaseV10Writer 通过 java客户端(底层MySQL JDBC或oceanbase client) 连接obproxy 远程 OceanBase 数据库,并执行相应的 insert sql 语句将数据写入 OceanBase ,内部会分批次提交入库。

实现原理

Oceanbasev10Writer 通过 DataX 框架获取 Reader 生成的协议数据,生成 insert 语句。对于mysql 租户,在主键或唯一键冲突时,可以选择 replace 模式,更新表中的所有字段。对于oracle 租户,目前只有 insert 行为。 出于性能考虑,写入采用 batch 方式批量写,当行数累计到预定阈值时,才发起写入请求。

下面是 oceanbasev10writer 的配置示例。

"writer": {

"name": "oceanbasev10writer",

"parameter": {

"obWriteMode": "insert",

"column": [

"*"

],

"preSql": [

"truncate table bmsql_oorder"

],

"connection": [

{

"jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",

"table": [

"bmsql_oorder"

]

}

],

"username": "tpcc",

"password":"********",

"batchSize": 512,

"writerThreadCount":10,

"memstoreThreshold": "0.9"

}

}

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论