工作原理简介
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向 MySQL master 发送 dump 协议。
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) 。
- canal 解析 binary log 对象(原始为 byte 流)。
部署 CANAL
MySQL 准备
- 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
下载启动 CANAL
- 下载 canal,访问地址:https://github.com/alibaba/canal/releases。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
- 解压缩
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C ~/canal
解压完成后,进入 /tmp/canal 目录,可以看到如下结构。
tree -L 3 ~/canal/
- 修改配置
vi conf/example/instance.properties
# mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK ,ISO-8859-1 。 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false 。
- 启动
sh bin/startup.sh
- 查看 server 日志
vi logs/canal/canal.log</pre>
- 查看 instance 的日志
vi logs/example/example.log
- 关闭
sh bin/stop.sh
部署 RDB 适配器
RDB adapter 用于适配mysql到任意关系型数据库(需支持jdbc)的数据同步及导入 。
- 修改启动器配置: application.yml, 这里以 OceanBase 目标库为例
canal.conf:canalServerHost: 127.0.0.1:11111batchSize: 500syncBatchSize: 1000retries: 0timeout:mode: tcp # kafka rocketMQsrcDataSources:defaultDS:url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=trueusername: rootpassword: 121212canalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: rdb # 指定为rdb类型同步key: oracle1 # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应properties:jdbc.driverClassName: com.mysql.cj.jdbc.Driver # jdbc驱动名, 部分jdbc的jar包需要自行放致lib目录下jdbc.url: jdbc:mysql:thin:@localhost:2883/tpccdb # jdbc urljdbc.username: root@obmysql # jdbc usernamejdbc.password: 123456 # jdbc passwordthreads: 5 # 并行执行的线程数, 默认为1
- RDB表映射文件
修改 conf/rdb/mytest_user.yml 文件:
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # cannal的instance或者MQ的topic
groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据
outerAdapterKey: oracle1 # adapter key, 对应上面配置outAdapters中的key
concurrent: true # 是否按主键hash并行同步, 并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
dbMapping:database: mytest # 源数据源的database/shcematable: user # 源数据源表名targetTable: mytest.tb_user # 目标数据源的库名.表名targetPk: # 主键映射id: id # 如果是复合主键可以换行映射多个
# mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)targetColumns: # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填id:name:role_id:c_time:test1:
导入的类型以目标表的元类型为准, 将自动进行类型转换。
- Mysql 库间镜像schema DDL DML 同步
修改 application.yml :
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.0.36/mytest?useUnicode=true
jdbc.username: root
jdbc.password: 121212
修改 conf/rdb/mytest_user.yml 文件:
dataSourceKey: defaultDS
destination: example
outerAdapterKey: mysql1
concurrent: true
dbMapping:mirrorDb: truedatabase: mytest
其中dbMapping.database 的值代表源库和目标库的 schema 名称,即两库的 schema 要一模一样。
- 启动 RDB
将目标库的jdbc jar 包放入lib文件夹。 启动 canal-adapter 启动器。
bin/startup.sh
验证 修改 mysql mytest.user 表的数据, 将会自动同步到 MySQL 的 MYTEST.TB_USER 表下面, 并会打出 DML 的 log 。




