异构数据库离线数据同步工具翘楚——Datax


https://github.com/alibaba/DataX (这是阿里的github地址)


python脚本驱动datax


import os # 用来执行python调用命令import time # 获取任务执行时间import pandas as pd # 处理jsonimport threading # 多线程执行任务from db_conn import Db_conn as db # 从自己编写的数据库连接模块导入数据库连接
# ['windows','linux']任务执行平台选择plat_form = 'windows'# 数据库可选列表 [SqlServer,MySQL,Oracle]reader = 'sqlserver' # 读取数据的数据库类型writer = 'oracle' # 写入数据的数据库类型# 不同的数据库名称对应不同的连接函数调用。在reader_db = '' # 读取数据的业务系统数据库名称writer_db = '' # 写入数据的业务系统数据库名称# 需要同步的表名称对应关系。我这里用的是字典列表来处理tables = [{'reader_table':'','writer_table':'','splitPk':''},{'reader_table':'','writer_table':'','splitPk':''},{'reader_table':'','writer_table':'','splitPk':''},]
for i in range(len(tables)):reader_table = tables[i]['reader_table']writer_table = tables[i]['writer_table']splitPk = tables[i]['splitPk']
current_path = os.path.dirname(os.path.abspath(__file__))
if plat_form == 'windows':json_file = current_path + '\\' + reader.lower().title() + '2' + writer.lower().title() + '.json' # windows执行else:json_file = current_path + '/' + reader.lower().title() + '2' + writer.lower().title() + '.json' # linux执行
if reader_db == 'a':conn = db.get_ERP_Re1_outer_conn()elif reader_db == 'b':conn = db.get_ERP业务242_2017_outer_conn()elif reader_db == 'c':conn = db.get_ERP业务242_2016_outer_conn()
read_df = pd.read_json(json_file)reader_parameter = read_df.to_dict()['job']['content'][0]['reader']['parameter'] # 获取需要修改的reader部分writer_parameter = read_df.to_dict()['job']['content'][0]['writer']['parameter'] # 获取需要修改的writer部分


def get_db_info(plat_form,dbname):"""获取数据库的连接信息"""if plat_form == 'windows':BI_conn = db.get_BI20_outer_conn() # windows中我们用外网ipelse:BI_conn = db.get_BI20_inner_conn() # linux中我们用内网ipsql = f'''SELECT * FROM ETL_DB_INFO WHERE DB_ALIAS = '{dbname}''''db_info = pd.read_sql(sql,BI_conn)return db_inforeader_dbinfo = get_db_info(plat_form,reader_db)writer_dbinfo = get_db_info(plat_form,writer_db)
# 获取要读取数据的表的列信息,以列表形式给出columns = list(pd.read_sql(f'select * from {reader_table} where 1=0',conn).columns)
reader_parameter['connection'][0]['jdbcUrl'] = [reader_dbinfo.loc[0,'DB_JDBC']]reader_parameter['username'] = reader_dbinfo.loc[0,'DB_UNAME']reader_parameter['password'] = reader_dbinfo.loc[0,'DB_PWD']reader_parameter['connection'][0]['table'] = [reader_table]reader_parameter['column'] = columns# 如果没有给出splitpk,那么我们默认获取表的第一个字段作为splitpkif len(splitPk) > 0:reader_parameter['splitPk'] = splitPkelse:reader_parameter['splitPk'] = columns[0]writer_parameter['connection'][0]['jdbcUrl'] = writer_dbinfo.loc[0,'DB_JDBC']writer_parameter['username'] = writer_dbinfo.loc[0,'DB_UNAME']writer_parameter['password'] = writer_dbinfo.loc[0,'DB_PWD']writer_parameter['connection'][0]['table'] = [writer_table]writer_parameter['column'] = columnsread_df.to_json(f'{json_file}') # 将json写入模板文件
"""执行datax"""time1 = time.time()if plat_form == 'windows':os.system('chcp 65001') # 在windows上防止执行日志出现乱码os.system(f'python D:\\datax\\bin\\datax.py {json_file}') # windows本地执行else:os.system(f'python3 home/Python/datax/bin/datax.py {json_file}') # linux服务器执行print(f'表{reader_table}执行完毕!耗时',time.time()-time1,'s')
threading.Thread(target=update_json,args=(plat_form,reader,writer,reader_db,writer_db,reader_table,writer_table,splitPk)).start()

文章转载自龙小马的数字化之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




