暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

DATAX的一个完整实践记录

一笑而起 2021-01-07
2133

缘起:

近期生产系统有从其他系统获取数据进行功能判断的依据。基于个人坚持的系统之间尽量解耦的原则,果断建议了etl同步方式。
因为之前对kettle+taskctl方式比较熟悉,也对商业的DataPipeline有过一定的接触和了解,因此这次果断选择了datax。
一个是基于对datax的了解,一个是基于要丰富自己能力(*^▽^*)。

记录:

一、datax安装部署

https://github.com/alibaba/DataX/blob/master/userGuid.md
看这个官方的文档就足够了。
简单说就是开箱即用。非常方便。


注意:python版本 建议2.6
python在3.0版本进行了语句的调整,和2.*的版本差异比较大,导致2.*的python是无法在环境是3.0的机器上运行的。


下附suse或者readhat的安装python 2.7的方法:
wget https://www.python.org/ftp/python/2.7.14/Python-2.7.14.tgz # Download
tar xvfz Python-2.7.14.tgz # unzip
cd Python-2.7.14 # go into directory
./configure
make # build
su # or 'sudo su' if there is no root user
make altinstall

二、mysql到mysql的数据同步

这块分两部分介绍

  1. datax的同步脚本,供学习者学习;

  2. 针对我的这个业务场景的补偿机制,供参考;

{"job": {
"setting": {
"speed": {
"channel": 4,
"record":-1,
"byte":-1,
"batchSize": 2048
},
"errorLimit": {
"record": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "***",
"password": "***",
"column": [
"columnA",
"columnB",
"columnC"
],
"splitPk": "columnA",
"where": "columnB > date_sub(now(),interval 1 hour)",
"connection": [
{
"table": [
"SOURCE_TABLE"
],
"jdbcUrl": [
"jdbc:mysql://IP:3306/xaxq?useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "replace",
"username": "***",
"password": "***",
"column": [
"columnA",
"columnB",
"columnC"
],
"session": [
"set session sql_mode='ANSI'"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://IP:3306/xasi?useUnicode=true&characterEncoding=utf8",
"table": [
"Target_table"
                                ]} ]} }} ]    }}                               
注:
1. column 列可以不用列出,可以用*替代;
2. splitPk 是划分并行的依据,建议是主键;
3. where 条件可以有效地进行业务增量同步;
4. querySql 当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项,这个可以用来进行多表筛选后的同步,比如一个复杂sql的结果;
5. writeMode 建议用replace,是insert or update的组合;
补偿机制的说明:
后面的crontab会看到,我假设是10分钟同步一次的话,我的增量范围是1小时的变化值。
这样,假设因为网络原因等导致任务失败,还会有5次尝试机会。
如果6次都失败,那就得人工介入,或者触发更大范围的补偿机制进行自愈。
前提:
根据具体的业务,我的是增量数据很小,因此可以如此进行。

三、shell脚本和监控:

这块看点:

  1. datax的调度方式;

  2. shell中的$?

  3. 日志和监控建议


datax也可以对接airflow等开源调度工具,不过此次对我来说,就一个业务的两张表,因此先看看下面的方式吧。

#!/bash/bin
source /etc/profile
DATE=`date '+%Y-%m-%d %H:%M:%S'`
echo $DATE >>/data/datax/log/logname.log
python /data/datax/bin/datax.py /data/datax/mysql_2_mysql.json
if [ $? -eq 0 ];then
echo "success table1" >>/data/datax/log/logname.log
echo
else
echo "failed table1" >>/data/datax/log/logname.log
fi
sleep 3
/data/datax/python/Python-2.7.14/python /data/datax/datax/bin/datax.py /data/datax/increment/mysql_audit_item_increment.json
if [ $? -eq 0 ];then
echo "success table2" >>/data/datax/log/logname.log
else
echo "failed table2" >>/data/datax/log/logname.log
fi


注:
linux提供$?特殊变量来保存最后一条命令执行结束的退出状态。执行完一条命令后,立即执行echo$?,可以查看最后一条命令的退出状态值。
正常的情况下,命令成功执行完成的退出状态是0,如果非0,则命令执行有错。
该命令可以用于检查命令是否正确执行。
退出状态码最高是255,一般自定义的代码值为0~255,如果超出255,则返回该数值被256除了之后的余数。
监控:
因为有了较好的补偿机制,所以,此次监控和告警就战略性的后延一段时间。
只说目前做了的和这块的后期监控建设思路:
日志监控:
1. datax有完整的日志,比较好的日志分类。但是还是内容比较多,上述脚本可以参考,将结果单独输出。匹配日志,如果有failed,则说明执行失败了。
对接监控平台:
1. 通过日志遍历内容,检索failed,对接监控平台的api接口,产生告警;
2. 修改上面的脚本,产生的错误信息,记录到数据库的某个log表中,通过业务层的监控工具或者自建的工具,实现告警;

四、crontab 调度

此处看点:

shell脚本可以执行成功,但是crontab的时候,就不一定了。

crontab有一个缺点,就是它总是不会缺省的从用户profile文件中读取环境变量参数

因此shell 脚本要添加 source /etc/profile

crontab -e # 编辑
*/10 * * * * /bin/sh ***.sh >/dev/null 2>&1

五、应用表的调用设计

  1. 如果同步的内容很少,则可以随便点稍微。当然最终还是要看业务的重要性和cpm等;

  2. 同步的内容作为一个独立的库或者表,做一层缓冲层。尽可能解耦两个系统的关联,尽可能加快同步的过程。通过mq或者其他业务触发机制,定期或者不定期的从缓冲表中获取数据到本系统中。

文章转载自一笑而起,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论