大家好,我是一安,之前介绍《实际开发中,如何保证数据库和缓存双写一致性》一文中,最后有提到使用Canal监听Mysql,今天就完整的介绍一下。
前言
在日常的工作中有没有遇到这样的场景,很多时候业务数据有变更需要及时加载到缓存、ES 或者发送到消息队列中通知下游服务。
一般遇到这种情况下,在实时性要求不高的场景我们有两种处理模式,一种是写任务定时推送数据同步到缓存中,另一个是下游服务定时自动拉取。这两种模式都依赖服务自己的定时周期时间,很多时候不好设定具体要多久执行一次,定时时间太短在数据没有变化的时候会有很多无效的操作,如果定时时间太长可能很多时候数据的延迟会比较大,某些时候影响也不好。
canal简介
Canal是阿里巴巴旗下的一款开源项目,纯Java开发,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
基于日志增量订阅和消费的业务包括
数据库镜像 数据库实时备份 索引构建和实时维护(拆分异构索引、倒排索引等) 业务 cache 刷新 带业务逻辑的增量数据处理
工作原理
MySQL主备复制原理
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看) MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log) MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议 MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) canal 解析 binary log 对象(原始为 byte 流)
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
环境准备
小编这里演示canal并推送mysql消息到rabbitmq,有兴趣的可以参考,修改接收方,比如kafka、ES
mysql和rabbitmq安装
安装mysql和rabbitmq,之前有专门介绍两个安装
这里直接使用之前搭的环境,使用过程中遇到一点小问题,下面会提到
开启mysql的bin-log日志
小编之前的搭的主从复制,所以是小编这里是开启的,如果你是自己搭的环境,检查是否开启,没有的话开启即可,命令如下:
1.查看mysql是否开启bin-log日志
SHOW VARIABLES LIKE '%log_bin%'
2.没有开启的话,增加如下配置重启mysql
server-id=1
log-bin=mysql-bin
binlog-format=ROW
创建canal用户获取bin-log日志
进入mysql执行如下命令即可:
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by "canal";
flush privileges;
rabbitmq新建一个topic类型交换机canal.topic,然后新增队列:canal.topic, 绑定canal.topic交换机, RoutingKey:canal.topic
canal安装
docker pull canal/canal-server:v1.1.5
# 创建一个容器
docker run --name canal -p 11111:11111 -v /mydata/canal-server/logs:/home/admin/canal-server/logs -d canal/canal-server:v1.1.5
# 复制容器中的配置文件到本地
docker cp canal:/home/admin/canal-server/conf/canal.properties /mydata/canal-server/conf/
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /mydata/canal-server/conf/
注意:
容器里配置复制到外面是为了后面挂载,以后修改配置不需要每次都进入容器内部 小编这里一开始未复制出容器里配置,而且直接外面建了两个空的配置文件,然后追加了配置信息,启动一直报空指针 2022-08-23 11:06:51.844 [main] ERROR com.alibaba.otter.canal.deployer.CanalLauncher - ## S>>omething goes wrong when starting up the canal Server:
java.lang.NullPointerException: null
at com.alibaba.otter.canal.deployer.CanalStarter.start(CanalStarter.java:68) ~[can>>al.deployer-1.1.5.jar:na]
at com.alibaba.otter.canal.deployer.CanalLauncher.main(CanalLauncher.java:117) ~[canal.deployer-1.1.5.jar:na]
查看源码部分后,启动需要加载很多配置,而小编使用的自己建的配置文件,只有部分配置
修改配置
instance.properties
# 不能和mysql重复
canal.instance.mysql.slaveId=2
# 使用mysql的虚拟ip和端口
canal.instance.master.address=192.168.5.128:3307
# binlog日志名称(非必选)
canal.instance.master.journal.name=mysql-bin.000006
# mysql主库链接时起始的binlog偏移量(非必选
canal.instance.master.position=4
# 使用已创建的canal用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# rabbitmq中配置的绑定的 routingkey
canal.mq.topic=canal.topic
canal.properties
目前rabbitMQ没有支持端口配置,默认是5672
canal.serverMode = rabbitMQ
rabbitmq.host = 192.168.5.128
rabbitmq.virtual.host = /
# rabbitmq中新建的Exchange
rabbitmq.exchange = canal.topic
rabbitmq.username = guest
rabbitmq.password = guest
#exchange的模式
rabbitmq.deliveryMode = topic
重新创建canal容器并挂在配置文件
1.先停掉原来启动的
docker stop canal
2.删除原来的容器
docker rm canal
3.重新创建容器并挂载配置文件
docker run --name canal -p 11111:11111 -v /mydata/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /mydata/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties -v /mydata/canal-server/logs:/home/admin/canal-server/logs -d canal/canal-server:v1.1.5
至此,环境准备完成,开始测试。
演示
新建数据库
新建测试数据库:demo
create database demo;
问题出现了,mysql,rabbitmq,canal都启动正常,但rabbitmq收不到消息,小编这里找了一个小时终于找到了,是之前搭建mysql环境配置的问题,原来配置主从复制时指定了需要同步的数据库,注释掉就可以了(自己刨的坑自己填,大家要注意自己的环境)
#需要同步的数据库
#binlog-do-db=demo_0
#binlog-do-db=demo_1
看一下rabbitmq
{"data":null,"database":"","es":1661232453000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `demo` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'","sqlType":null,"table":"","ts":1661232453785,"type":"QUERY"}
新增表
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(30) NOT NULL DEFAULT '',
`address` varchar(200) DEFAULT NULL,
`email` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
看一下rabbitmq
{"data":null,"database":"demo","es":1661232589000,"id":5,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `user` (\n `id` int(11) NOT NULL AUTO_INCREMENT,\n `name` varchar(30) NOT NULL DEFAULT '',\n `address` varchar(200) DEFAULT NULL,\n `email` varchar(100) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"user","ts":1661232589258,"type":"CREATE"}
新增数据记录
insert into user (name,address,email) values ('lisi','beijing','1234');
看一下rabbitmq
{"data":[{"id":"1","name":"lisi","address":"beijing","email":"1234"}],"database":"demo","es":1661232730000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(30)","address":"varchar(200)","email":"varchar(100)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"address":12,"email":12},"table":"user","ts":1661232730878,"type":"INSERT"}
数据修改,删除,字段变更就不一一测试了,留给大家一些操作空间
。
Java客户端连接
注意:客户端连接需要修改配置canal.properties中canal.serverMode = tcp
1.引入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
2.代码示例
public static void main(String[] args){
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.5.128", 11111), "example", "", "");
canalConnector.connect();
//订阅所有消息
canalConnector.subscribe(".*\\..*");
// 只订阅test数据库下的所有表
//canalConnector.subscribe("test");
//恢复到之前同步的那个位置
canalConnector.rollback();
for(;;){
//获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
Message message = canalConnector.getWithoutAck(100);
//获取消息id
long batchId = message.getId();
if(batchId != -1){
printEnity(message.getEntries());
//提交确认
//canalConnector.ack(batchId);
//处理失败,回滚数据
//canalConnector.rollback(batchId);
}
}
}
private static void printEnity(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if(entry.getEntryType() != CanalEntry.EntryType.ROWDATA){
continue;
}
try{
// 序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
System.out.println(rowChange.getEventType());
switch (rowChange.getEventType()){
//如果希望监听多种事件,可以手动增加case
case INSERT:
// 表名
String tableName = entry.getHeader().getTableName();
System.out.println("表名:"+tableName);
//测试users表进行映射处
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for(CanalEntry.Column c:afterColumnsList){
System.out.println("字段:"+c.getName()+",值:"+c.getValue());
}
System.out.println("插入的数据是:" + afterColumnsList);
break;
case UPDATE:
List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
System.out.println("更新的数据是:" + afterColumnsList2);
break;
case DELETE:
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
System.out.println("被删除的数据是:" + beforeColumnsList);
break;
default:
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
3.输出
INSERT
表名:user
字段:id,值:2
字段:name,值:lisi
字段:address,值:beijing
字段:email,值:1234
插入的数据是:
[index: 0 sqlType: 4 name: "id" isKey: true updated: true isNull: false value: "2" mysqlType: "int(11)"
, index: 1 sqlType: 12 name: "name" isKey: false updated: true isNull: false value: "lisi" mysqlType: "varchar(30)"
, index: 2 sqlType: 12 name: "address" isKey: false updated: true isNull: false value: "beijing" mysqlType: "varchar(200)"
, index: 3 sqlType: 12 name: "email" isKey: false updated: true isNull: false value: "1234" mysqlType: "varchar(100)"]
号外!号外!
如果这篇文章对你有所帮助,或者有所启发的话,帮忙点赞、在看、转发、收藏,你的支持就是我坚持下去的最大动力!






