同步工具是南大通用数据技术股份有限公司开发的用于同步指定数据库数据到目标数据库的软件工具。支持从不同数据库到8a MPP数据库的增量数据同步,目前支持的源端数据库包括:
|
数据库类型 |
版本 |
|
Oracle(RAC) |
10g、11g、12c、18c、19c |
|
GBase
8t/GBase 8s |
全部版本 |
|
MySQL |
5.1~5.7、8.0 |
|
Informix |
11.5FC5及以上版本 |
|
SQLServer |
SQLServer2008、2012 |
同步工具在结构上包括3个部分:
1、管理组件:负责协同读组件和写组件
2、数据读取组件:负责从源端数据库读取增量数据,同时发送到kafka
3、数据写组件:负责从kafka读取增量数据,将数据保存到目标端数据库
在同步数据到8aMpp的场景下,只需要管理组件和读组件,而写库的动作是通过8aMPP自带的consumer实现的,kafka作为中间件,用于缓存读组件和consumer之间待同步数据,以解耦读组件与consumer,实现生产者消费者模式。
下面就以oracle到8a作为示例说明一下RTSync的使用
一、配置Oracle 的logminer
1.
安装LogMiner:如果已安装,跳过此步骤。如果没有,使用sqlplus执行以下命令(这两个脚本必须以DBA用户身份运行):
|
SQL> @/opt/oracle/product/OraHome/rdbms/admin/dbmslm.sql SQL> @/opt/oracle/product/OraHome/rdbms/admin/dbmslmd.sql |
2.
开启Oracle归档日志:如果已开启,跳过此步骤。如果没有,使用conn /as
sysdba登录要开启的Oracle数据库执行以下命令:
|
1) 查看是否处于归档模式:SELECT log_mode FROM v$database;如果现实数据库为归档模式(ARCHIVELOG),则后续步骤不用做。 2) 开启归档模式: --关闭数据库 SQL> SHUTDOWN
IMMEDIATE; --启动到mount状态 SQL> STARTUP
MOUNT; --开启归档日志 SQL> ALTER
DATABASE ARCHIVELOG; --开启数据库 SQL> ALTER
DATABASE OPEN; --如果为pdb库,执行 SQL> ALTER
PLUGGABLE DATABASE 【库名】 OPEN; |
3.
开启补充日志:如果已开启,跳过此步骤。如果没有,使用conn /as sysdba登录要开启的Oracle数据库执行以下命令:
|
1) 查看是否开启:select supplemental_log_data_min from
v$database;(YES代表已经开启,无需再次激活) 2) 开启补充日志: --全列日志 SQL> alter
database add supplemental log data(all) columns; --最小补充日志 SQL> alter
database add supplemental log data; --主键补充日志 SQL> alter
database add supplemental log data (Primary key) columns; --唯一索引补充日志 SQL> alter
database add supplemental log data (unique) columns; --强制日志 SQL> alter
database force logging; |
4.
创建同步工具使用的Oracle用户并授权:如果已有dba权限的用户,跳过此步骤。如果没有,使用conn /as sysdba登录要创建用户的Oracle数据库执行以下命令(假设用户名为test,密码为testpassword):
|
--创建用户test SQL> create user test identified by testpassword; --为用户test授权 SQL> grant connect, resource, dba to test; |
如果无法授予用户dba权限,可以授予用户如下使用logminer的最小权限:
|
--为用户test授权 SQL> grant connect to test; SQL> grant execute on dbms_logmnr to test; SQL> grant select any transaction to test; SQL> grant create table to test; SQL> grant select on v_$logmnr_contents to test; SQL> grant select on v_$log to test; SQL> grant select on v_$archived_log to test; SQL> grant select on v_$database to test; SQL> grant select on v_$logfile to test; SQL> grant select on gv_$log to test; SQL> grant select on gv_$archived_log to test; SQL> grant select on gv_$logfile to test; |
5.
如果使用Oracle12c以上版本的容器模式,假设cdb用户为c##test,pdb用户为test;
1)
需要为c##test用户创建一个用户角色c##test_logminer_privs,并将角色赋权给c##test用户
|
SQL> create role c##test_logminer_privs
container=all; SQL> grant connect, create table, create session, execute_catalog_role, select any transaction, logmining, set container, select any dictionary to c##test_logminer_privs
container = all; SQL> grant select on system.logmnr_col$ to
c##test_logminer_privs container = all; SQL> grant select on system.logmnr_obj$ to
c##test_logminer_privs container = all; SQL> grant select on system.logmnr_user$ to c##test_logminer_privs
container = all; SQL> grant select on system.logmnr_uid$ to
c##test_logminer_privs container = all; SQL> grant select on v_$database to
c##test_logminer_privs container = all; SQL> grant select_catalog_role to
c##test_logminer_privs container = all; SQL> grant c##test_logminer_privs to c##test container = all; |
2)
为pdb用户test赋如下权限
|
SQL> grant connect, select any table to test; |
二、解压RTSync安装包
将安装光盘中的同步工具的安装包GBase_RTSync_8.6.13.1.tar.gz拷贝到服务器上,例如:172.16.3.17,以Redhat为例,将其放置到/opt/目录下。需要执行以下命令完成解压步骤:
1、进入安装包所在目录,执行命令:
|
# cd /opt/ |
2、解压自动安装包,执行命令:
|
# tar -zxvf GBase_RTSync_8.6.13.1.tar.gz |
3、进入同步工具的安装目录
|
# cd /opt/RTSyncInstall_8.6.13.1_build1/RTSync |
三、配置config_task.xml 文件
1、进入同步工具conf配置文件目录:
|
#cd /opt/RTSyncInstall_8.6.13.1_build1/RTSync/conf |
2、将sample目录下的config_task_ora_8a.xml复制到conf目录,
|
#cp ../sample/config_task_ora_8a.xml
config_task.xml |
3、编辑config_task.xml文件
|
vim config_task.xml |
配置config_task.xml文件参数,详细如下:
|
<?xml
version="1.0" encoding="UTF-8" ?> <servers> <server id="server1"
syncMode="increment" dataFormatType="PUREDATA"
mqType="kafka" queueName="orato8aMQ" isHighAvailable="false" dataRecoveryMode="file"> <manager ip="172.16.3.19" #管理端部署服务器 port="9432" heartbeatPort="9000" httpPort="8080" heartbeatTimeOut="6" isTableHotPatch="true" hotPatchTimeOut="30"/> <source
ip="172.16.3.19" #读端部署服务器 path="/opt/RTSyncInstall_8.6.13.1_build1/RTSync"
#安装路径 queueSize="1000" readParseAdapter="adapter" user="root"
#同步工具读端所在服务器用户 password="123123"
#同步工具读端所在服务器用户密码 openMonitor="true" monitorInterval="300" dataFormatParallel="10" rpcPort="9191" sshPort="22" dbObjToUpperCase="true" /> <mappings> <source-target
id="0524_bht"> <db> <sourcedb #此标签为同步的源库配置项
charset="GB18030" type="ORACLE" #值必须为ORACLE,不用修改
startLSN="0"
maxRecordsPerRead="200"
maxSizeOfPerRecord="1024"
timeOut="2" driver="oracle.jdbc.OracleDriver" #驱动名,不需要修改 url="jdbc:oracle:thin:@//10.122.227.69:1522/orcl" #连接源端的jdbc url, user="test" #数据库用户 password="test" ##数据库用户密码
catalog="TEST" #配置成需要同步的库
timestampWithFraction="true"
oracleScnStep="5000"
fetchSize="1000"
allowPrimaryKeyNull="true"
showCommitTime="true"> </sourcedb> <targetdb #此标签为同步的目标库配置项
charset="UTF8" type="GCLUSTER" #必须为GCLUSTER
commitSize="100"
queueSize="20000"
timeOut="30"
user="gbase" #数据库用户
password="gbase20110531" #数据库密码
driver="com.gbase.jdbc.Driver" catalog="rtsync2"
#目标库 url="jdbc:gbase://172.16.3.86:5258/rtsync2?useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true&connectTimeout=0&socketTimeout=0" > </targetdb> <tables
isInclude="true"> #此标签为需要同步的表配置 <table deleteMode="NORMAL" sourceTableName="T1" sourcePkColName="A"
targetTableName="t1" targetPkColName="a"> # sourceTableName 为源库的表名(Oracle注意大小写)、targetTableName为目标库表名(小写) </table> </tables> </db> </source-target> </mappings> </server> </servers> |
进入同步工具conf配置文件目录:
cd /opt/RTSyncInstall_8.6.13.1_build1/RTSync/conf
注意:config_kafka_[queueName].properties
文件名中的queueName 必须与config_task.xml文件中的<server 标签的queueName="orato8aMQ"
值保持一致。
1、需要将conf目录下的
config_kafka_8tto8tMQ.properties修改为
config_kafka_orato8aMQ.properties文件,修改语句如下:
|
#mv config_kafka_orato8tMQ.properties config_kafka_orato8aMQ.properties |
2、编辑config_kafka_orato8aMQ.properties文件,修改配Kafka相关连接参数
|
#vim config_kafka_orato8aMQ.properties |
只要修改以下几个参数红色字体参数:
1、
topic.name为Kafka使用的topic的名称,这个可以随意命名
topic.name= testtp
2、 topic.enable.auto.create为是否自动创建topic的开关,值为true
topic.enable.auto.create=true
3、 topic.replication.num为topic的复制数量,取值范围为大于0且小于节点数量
topic.replication.num=1
4、 bootstrap.servers为每台Kafka节点的IP和端口号;
bootstrap.servers=172.16.3.19:9092,172.16.3.119:9092
5、 zookeeper.connect为每台Zookeeper节点的IP和端口号;
zookeeper.connect=172.16.3.19:2181,172.16.3.119:2181
6、 group.id为Kafka的消费者的唯一组id;
group.id=test
以下为测试环境config_kafka_orato8aMQ.properties文件参数配置:
|
topic.name=testtp #is auto create topic name,default false topic.enable.auto.create=true #topic backup number, >0,<=brokers num topic.replication.num=1 #producer conf bootstrap.servers=172.16.3.19:9092,172.16.3.119:9092 #wait count to commit kafka.batch.commit.count=1000 #wait time to commit kafka.batch.commit.time=300 #kafka resend times kafka.resend.max.retries=3 #kafka send result has confirm (send successed or send failed) kafka.send.issuccess=true #client send max data size. default:brokers message.max.bytes send.data.max.size=104857600 #kafka acknowledgement
mechanism :0,1,all kafka.acks=all #kafka.producer.paramers=request.timeout.ms=60000;max.request.size=105857600;acks=1;compression.type=gzip kafka.producer.paramers=request.timeout.ms=30000;metadata.fetch.timeout.ms=60000;linger.ms=1 #consumer conf zookeeper.connect=172.16.3.19:2181,172.16.3.119:2181 group.id=test auto.commit.interval.ms=5000 auto.commit.offset.enable=false zookeeper.sync.time.ms=3000 zookeeper.session.timeout.ms=6000 #message max size for consumer ,suggest :fetch.message.max.bytes =
send.data.max.size=1000000 fetch.message.max.bytes=104857600 kafka.consumer.paramers= |
所有配置文件启动完成后,启动RTSync服务器,进入RTSync安装目录
1. 进入管理组件的所在目录,执行命令:
|
# cd /opt/RTSyncInstall_8.6.13.1_build1/RTSync/ |
2. 执行启动命令:
|
# sh RTSyncManagerServer.sh start |
屏幕上显示如下信息:
|
[root@node1 RTSync]# sh RTSyncManagerServer.sh start Starting RTSync 8.6.13.1_build1. Copyright (c) 2004-2019, GBase. All Rights Reserved. 请确认同步工具启动前目标端数据库已关闭触发器,否则有可能导致目标数据库数据不一致(y/n):y ***************************************** RTSync version 8.6.13.1_build1 ***************************************** 2018-07-14 03:46:50,033 同步工具管理端服务配置信息初始化完成。 STARTED [root@node1 RTSync]# 七月 14, 2018 3:46:52 上午 org.glassfish.grizzly.http.server.NetworkListener start 信息: Started listener bound to [172.16.3.19:8080] 七月 14, 2018 3:46:52 上午
org.glassfish.grizzly.http.server.HttpServer start 信息: [HttpServer] Started. 2018-07-14 03:46:52,071 获取读端元数据信息 2018-07-14 03:46:52,395 获取读端元数据信息完成 2018-07-14 03:46:52,899 元数据验证通过。 2018-07-14 03:46:54,939 读端组件正在启动中...... 2018-07-14 03:46:55,228 读端组件启动完成。 2018-07-14 03:46:55,363 读端增量同步启动开始... 2018-07-14 03:46:55,615 读端增量同步启动完成。 2018-07-14 03:46:56,993 启动异常监听服务。 2018-07-14 03:46:56,994 同步工具管理端服务启动完成。 |
8a Consumer是GBase8ampp自带的消费组件,用来同步消费 Kafka 数据到 8a MPP Cluster数据库中。
备注:目前由其它源库同步到GBase8a MPP时,采用的都是:
GBase RTSync(管理、读) + 8a Consumer(写)的方式进行环境部署。
配置步骤如下:
步骤一:修改8a mpp配置文件
修改8a mpp所有节点的/opt/gcluster/config/gbase_8a_gcluster.cnf
|
gcluster_kafka_consumer_enable=1 gcluster_lock_level=10 _gbase_transaction_disable=1 _gcluster_insert_cache_buffer_flag=1 gcluster_assign_kafka_topic_period=20 gcluster_kafka_max_message_size=1000000 gcluster_kafka_batch_commit_dml_count=100000 gcluster_kafka_local_queue_size=210000 gcluster_kafka_user_allowed_max_latency=20000 gcluster_kafka_consume_batch=100 gcluster_kafka_parallel_commit=5 gcluster_kafka_primarykey_can_be_null=1 gcluster_kafka_data_buf_size=20000 gcluster_kafka_message_format_type=PUREDATA(与RTSync配置的数据格式保持一致) |
修改8a mpp所有节点的/opt/gnode/config/gbase_8a_gbase.cnf
|
gbase_tx_log_mode=ONLY_SPECIFY_USE _gbase_transaction_disable=1 gbase_buffer_insert=1024M (有大事务时需要调大,根据8a的服务器内存情况) gbase_tx_log_flush_time=5 |
备注:修改完以上两个配置文件,则需要service gcware restart 重新启动集群服务才生效
步骤二:创建8a consumer
通过gccli 命令 登录8a mpp命令行 执行以下8a consumer相关语句
创建语句:
|
create kafka consumer test transaction topic testtp brokers ‘192.168.5.11:9092,192.168.5.12:9092’; |
说明:
1、创建consumer时 topic需要与RTSync的config_kafka_orato8aMQ.properties配置文件中的topic.name一致。
2、brokers 就是配置部署的Kafka节点信息
步骤三:启动8a consumer
启动语句:
|
#start kafka consumer test |
步骤三:监控及查询consumer同步状态
查看 consumer task 属性:
|
# show
kafka consumer test; |
从系统表中查询数据同步状态
|
select * from information_schema.kafka_consumer_status; |
gbase> select * from
information_schema.kafka_consumer_status;
+---------+---------------+---------+------------+------------+------------+----------------+---------------+-----------+
| TOPIC
| IP | STATUS | MIN_OFFSET | MAX_OFFSET | CUR_OFFSET |
PROCESS_OFFSET | COMMIT_OFFSET | EXCEPTION |
+---------+---------------+---------+------------+------------+------------+----------------+---------------+-----------+
| k1topic | 192.168.6.161 | STARTED | 0
| 0 |
0 | 0
| 0 | |
+---------+---------------+---------+------------+------------+------------+----------------+---------------+-----------+




