Canal 简介

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,不支持全量已有数据同步。由于采用了 binlog 机制,Mysql 中的新增、更新、删除操作,对应的 Elasticsearch都能实时新增、更新、删除。
MySQL 主备复制原理

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)。 MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)。 MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据。
Canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。 MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。 canal 解析 binary log 对象(原始为 byte 流)。
MySQL 配置
开启 binlog
Canal 使用 MySQL 的 binlog 机制实现数据动态变化监测,所以需要 Mysql 提前配置 binlog。
编辑 etc/my.cnf 的 mysqld 下添加如下配置:
server-id = 7777
log_bin = mysql-bin
binlog_format = row
binlog_row_image = full
expire_logs_days = 10
然后,重启一下 Mysql 以使得 binlog 生效。
systemctl restart mysqld.service
检查 binlog 是否开启:
[root@mysql-5 ~]# mysqladmin variables -uroot@123456 | grep log_bin
| log_bin | ON
创建用户
创建用户 canal,密码 canal,并授予 MySQL slave 的权限:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
下载 Canal 压缩包
下载地址: https://github.com/alibaba/canal/releases

场景一:同步 MySQL 数据到 Kafka
Canal Server 配置
修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:
#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 匹配所有表,则每个表都会发送到各自表名的topic
# 例如数据库名为school,表名为student,则topic名字为school_student
canal.mq.dynamicTopic=.*\\..*
修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:
# canal模式,有tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
#Kafka集群地址
kafka.bootstrap.servers = 192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092
启动 canal server:
bin/startup.sh
Kafka 消费数据
往 MySQL 数据库中插入一条数据:
mysql> insert into student values(8,'tonny',20);
Query OK, 1 row affected (0.01 sec)
Kafka 消费者可以成功消费到新插入的数据:
[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic student_school --from-beginning
{"data":[{"id":"8","name":"tonny","age":"20"}],"database":"school","es":1617512377000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(20)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"student","ts":1617512405579,"type":"INSERT"}
场景二:同步 MySQL 数据到 Elasticsearch
Canal Server 配置
修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:
#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:
# canal模式,有tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp
启动 canal server:
bin/startup.sh
Canal-adapter 配置
修改启动器配置
修改conf/application.yml文件:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# Canal Server的访问地址(在前面的conf/canal.properties中默认配置了),保持默认
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# 连接的数据库信息
srcDataSources:
defaultDS:
# jdbc:mysql://<MySQL地址>:<端口>/<数据库名称>?useUnicode=true
url: jdbc:mysql://192.168.1.14:3306/school?useUnicode=true
username: canal #用户名
password: canal #密码
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: es7 #去读取的conf/es7目录下的yml文件
hosts: 192.168.1.171:9300 #Elasticsearch地址
properties:
mode: transport
cluster.name: cr7-elastic #Elasticsearch集群名字
编辑适配器表映射文件
编辑 conf/es7/mytest_user.yml文件,注意指定对应库表 id 为 Elasticsearch中 的 _id,否则会空指针异常:
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example #cannal的instance或者MQ的topic
groupId: g1 #对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: mytest_user #在Elasticsearch实例中所创建的索引的名称
_id: _id #需要同步到Elasticsearch实例的文档的id,可自定义。本文使用_id
sql: "select t.id as _id,t.id,t.name,t.age from student t" #SQL语句,用来查询需要同步到Elasticsearch中的字段
commitBatch: 3000 #提交批大小
启动 canal-adapter 启动器:
bin/startup.sh
启动成功后查看 canal adapter 日志会有如下内容:
[root@canal canal-adapter]# tail -f /usr/local/canal-adapter/logs/adapter/adapter.log
#输出结果:
......
2021-04-04 13:32:58.625 [http-nio-8081-exec-10] INFO com.alibaba.otter.canal.adapter.launcher.rest.CommonRest - #Destination: example sync on
2021-04-04 13:32:58.631 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2021-04-04 13:32:58.677 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
......
Elasicsearch 获取数据
先创建索引对应的 mapping,否则 canal 会无法识别索引,会报写入错误:
PUT mytest_user
{
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"age": {
"type": "integer"
}
}
}
}
往 MySQL 数据库中插入一条数据:
mysql> insert into student values(2,'chengzw',18);
Query OK, 1 row affected (0.01 sec)
查看 canal-adapter 日志:
2021-04-04 13:36:12.022 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":2,"name":"chengzw","age":18}],"database":"school","destination":"example","es":1617514543000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"student","ts":1617514571559,"type":"INSERT"}
Affected indexes: mytest_user
查看 Elasticsearch 可以搜索到新的数据:
GET mytest_user/_search
#返回结果:
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "mytest_user",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"id" : 2,
"name" : "chengzw",
"age" : 18
}
}
]
}
}
如果删除 MySQL 数据库数据,Canal 也会将对应 Elasticsearch 上的文档删除。
Canal-adapter 管理 REST 接口
# 查询所有订阅同步的canal instance
[root@canal canal]# curl http://127.0.0.1:8081/destinations
[{"destination":"example","status":"on"}]
# 数据同步关闭
[root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
{"code":20000,"message":"实例: example 关闭同步成功"}
# 数据同步开启
[root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/on -X PUT
{"code":20000,"message":"实例: example 开启同步成功"}
场景三:通过 Java API 获取 Canal 数据
Canal Server 配置
修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:
#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
引入 maven 依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
Java 代码
package com.chengzw;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
* Canal 客户端
* @author 程治玮
* @since 2021/4/4 12:31 下午
*/
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.175",
11111), "example", "", "");
//一次性获取数据的数量
int batchSize = 1000;
int emptyCount = 0;
try {
//打开连接
connector.connect();
//订阅全部数据库和全部表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
//指定连续获取空数据最大的次数,达到这个次数断开连接
int totalEmptyCount = 2000;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries()); //获取数据信息
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
//打印格式:binlog[mysql-bin.000001:5430] , name[school,student] , eventType : INSERT
/// binlog文件名,偏移量,数据库名,表名,操作
System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//打印数据
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) { //删除数据
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) { //插入数据
printColumn(rowData.getAfterColumnsList());
} else { //更新数据
System.out.println("------- before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("------- after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
// 打印数据每个字段的更新情况
// id : 8 update=false
// name : tonny update=false
// age : 20 update=false
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
往 MySQL 数据库中插入一条数据:
mysql> insert into student values(14,'mali',20);
Query OK, 1 row affected (0.00 sec)
查看控制台输出结果:
empty count : 1
empty count : 2
empty count : 3
================ binlog[mysql-bin.000001:12924] , name[school,student] , eventType : INSERT
id : 14 update=true
name : mali update=true
age : 20 update=true
参考链接
https://github.com/alibaba/canal/wiki/ClientAdapter https://github.com/alibaba/canal/wiki/Sync-ES https://github.com/alibaba/canal/wiki/ClientExample https://mp.weixin.qq.com/s/YKqKW0n5JTPgTd9kv9RDhQ https://help.aliyun.com/document_detail/135297.html#title-3b7-i1b-4n3 https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart




