
一、什么是canal
官网的介绍
canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

Canal 是阿里巴巴开源的一套分布式数据库同步系统,目前主要支持 MySQL、RDS。Canal 的主要原理是伪装成 MySQL 的 Slave 节点,监听 MySQL 主库的 binlog 文件,根据 binglog 将数据库事件发送到 MQ 中,消费端可以订阅 MQ 中的消息。为了方便 Canal 的运维人员,阿里还提供了 Canal Admin 这个运维平台,使用户可以快速和安全的操作。
二、canal能做什么
基于日志增量订阅&消费实现数据同步,canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:
数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息
三、如何搭建canal
3.1 首先有一个MySQL服务器
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
3.1.1 在MySQL中需要创建一个用户,并授权:
-- 创建用户,用户名:canal,密码:canal@123456
create user canal identified by 'canal@123456';
-- 为canal用户赋予replication权限,*.*标识所有库
grant SELECT,REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal@123456';
-- 修改完毕立即生效
flush privileges;
3.1.2 使用命令查看数据库是否开启binlog模式:
show variables like 'log_%';
显示如下图:

log_bin属性值为ON,则binlog模式开启;为OFF则binlog模式关闭。
若binlog模式关闭,则在MySQL配置文件my.cnf设置如下信息:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
修改完配置文件之后,重启MySQL,使用命令查看是否打开binlog模式。

查看binlog日志文件列表:

查看当前正在写入的binlog文件:

3.2 安装canal
官网下载地址:
https://github.com/alibaba/canal/releases
以v1.1.5-alpha-2版本为例

1.linux服务器下载:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
2.创建文件夹:mkdir usr/local/canal
3.解压文件:tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz.tar.gz -C usr/local/canal/
解压后如下图:

打开配置文件conf/example/instance.properties,配置信息如下:
vi conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0
# 数据库地址# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量canal.instance.master.position=154# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=备注:
canal.instance.filter.regex 配置规则
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表:canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
4.启动canal服务
window: bin/startup.bat
linux: bin/startup.sh
四、Java客户端操作
4.1 在pom.xml中添加canal的jar包
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
4.2 编写测试类
import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;import com.alibaba.otter.canal.client.*;public class CanalTest {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.34",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEntryCount = 1200;while (emptyCount < totalEntryCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} else {emptyCount = 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认}System.out.println("empty too many times, exit");}catch (Exception e){//connector.rollback(batchId); 处理失败, 回滚数据}finally {connector.disconnect();}}private static void printEntry( List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());//此处可以将监控到的rowData.getAfterColumnsList()集合数据更新或者同步到redis或者es中} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());//此处可以将监控到的rowData.getAfterColumnsList()集合数据更新或者同步到redis或者es中} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());//此处可以将监控到的rowData.getAfterColumnsList()集合数据更新或者同步到redis或者es中}}}}private static void printColumn( List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}}
测试结果示例:
================》; binlog[mysql-bin.000003:409047] , name[jq_test,user] , eventType : UPDATE------->; beforeid : 22 update=falsenick_name : 米熊 update=falsephone : update=falsehead_img_url : https://wx.qlogo.cn/mmopen/Q/132 update=falsegender : 2 update=falseapplet_open_id : oY0le5V-YzahRYmY5PWMfo update=falseip_addr : update=falseuser_type : 1 update=falsecreate_time : 2020-03-25 21:21:26 update=falseupdate_time : update=falseunion_id : update=falsepublic_open_id : update=false------->; afterid : 22 update=falsenick_name : 。米哈 update=truephone : update=falsehead_img_url : https://wx.qlogo.cn/mmopen/Q/132 update=falsegender : 2 update=falseapplet_open_id : oY0le5V-YzahRYmY5PWMfo update=falseip_addr : update=falseuser_type : 1 update=falsecreate_time : 2020-03-25 21:21:26 update=falseupdate_time : update=falseunion_id : update=falsepublic_open_id : update=false




