暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Canal 详解

大飞智家 2020-10-20
3020


一、什么是canal

官网的介绍

canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费


      Canal 是阿里巴巴开源的一套分布式数据库同步系统,目前主要支持 MySQL、RDS。Canal 的主要原理是伪装成 MySQL 的 Slave 节点,监听 MySQL 主库的 binlog 文件,根据 binglog 将数据库事件发送到 MQ 中,消费端可以订阅 MQ 中的消息。为了方便 Canal 的运维人员,阿里还提供了 Canal Admin 这个运维平台,使用户可以快速和安全的操作。


二、canal能做什么

基于日志增量订阅&消费实现数据同步,canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:

  1. 数据库镜像

  2. 数据库实时备份

  3. 多级索引 (卖家和买家各自分库索引)

  4. search build

  5. 业务cache刷新

  6. 价格变化等重要业务消息


三、如何搭建canal

3.1 首先有一个MySQL服务器

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

3.1.1 在MySQL中需要创建一个用户,并授权:

-- 创建用户,用户名:canal,密码:canal@123456

create user canal identified by 'canal@123456';

-- 为canal用户赋予replication权限,*.*标识所有库

grant SELECT,REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal@123456';

-- 修改完毕立即生效

flush privileges;


3.1.2 使用命令查看数据库是否开启binlog模式:

show variables like 'log_%';


显示如下图:


log_bin属性值为ON,则binlog模式开启;为OFF则binlog模式关闭。

若binlog模式关闭,则在MySQL配置文件my.cnf设置如下信息:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复


修改完配置文件之后,重启MySQL,使用命令查看是否打开binlog模式


查看binlog日志文件列表:


查看当前正在写入的binlog文件:


3.2 安装canal

官网下载地址:

https://github.com/alibaba/canal/releases


v1.1.5-alpha-2版本为例


1.linux服务器下载:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz


2.创建文件夹:mkdir usr/local/canal


3.解压文件:tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz.tar.gz -C usr/local/canal/


解压后如下图:

打开配置文件conf/example/instance.properties,配置信息如下:

vi conf/example/instance.properties

#################################################

## mysql serverId , v1.0.26+ will autoGen

## v1.0.26版本后会自动生成slaveId,所以可以不用配置 # canal.instance.mysql.slaveId=0 # 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称 canal.instance.master.journal.name=mysql-bin.000001 # mysql主库链接时起始的binlog偏移量canal.instance.master.position=154# mysql主库链接时起始的binlog的时间戳 canal.instance.master.timestamp= canal.instance.master.gtid= # username/password # 在MySQL服务器授权的账号密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal@123456 # 字符集 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false # table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开 canal.instance.filter.regex=.*\\..* # mysql 数据解析表的黑名单,多个表用,隔开 canal.instance.filter.black.regex=


备注:

canal.instance.filter.regex 配置规则

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表:canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)



4.启动canal服务

window: bin/startup.bat

linux: bin/startup.sh


四、Java客户端操作


4.1 在pom.xml中添加canal的jar包

<dependency>

    <groupId>com.alibaba.otter</groupId>

    <artifactId>canal.client</artifactId>

    <version>1.1.4</version>

</dependency>


4.2 编写测试类


    import java.net.InetSocketAddress;
    import java.util.List;


    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    import com.alibaba.otter.canal.client.*;




    public class CanalTest {


    public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.34",
    11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
    connector.connect();
    connector.subscribe(".*\\..*");
    connector.rollback();
    int totalEntryCount = 1200;
    while (emptyCount < totalEntryCount) {
    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
    long batchId = message.getId();
    int size = message.getEntries().size();
    if (batchId == -1 || size == 0) {
    emptyCount++;
    System.out.println("empty count : " + emptyCount);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    } else {
    emptyCount = 0;
    printEntry(message.getEntries());
    }
    connector.ack(batchId); // 提交确认
    }
    System.out.println("empty too many times, exit");
    }catch (Exception e){
    //connector.rollback(batchId); 处理失败, 回滚数据
    }
    finally {
    connector.disconnect();
    }
    }


    private static void printEntry( List<Entry> entrys) {
    for (Entry entry : entrys) {
    if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
    continue;
    }
    RowChange rowChage = null;
    try {
    rowChage = RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
    }


    EventType eventType = rowChage.getEventType();
    System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
    eventType));
    for (RowData rowData : rowChage.getRowDatasList()) {
    if (eventType == EventType.DELETE) {
    printColumn(rowData.getBeforeColumnsList());
    //此处可以将监控到的rowData.getAfterColumnsList()集合数据更新或者同步到redis或者es中
    } else if (eventType == EventType.INSERT) {
    printColumn(rowData.getAfterColumnsList());
    //此处可以将监控到的rowData.getAfterColumnsList()集合数据更新或者同步到redis或者es中
    } else {
    System.out.println("-------> before");
    printColumn(rowData.getBeforeColumnsList());
    System.out.println("-------> after");
    printColumn(rowData.getAfterColumnsList());
    //此处可以将监控到的rowData.getAfterColumnsList()集合数据更新或者同步到redis或者es中
    }
    }
    }
    }


    private static void printColumn( List<Column> columns) {
    for (Column column : columns) {
    System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    }
        }
    }


    测试结果示例:

      ================》; binlog[mysql-bin.000003:409047] , name[jq_test,user] , eventType : UPDATE
      ------->; before
      id : 22 update=false
      nick_name : 米熊    update=false
      phone : update=false
      head_img_url : https://wx.qlogo.cn/mmopen/Q/132 update=false
      gender : 2 update=false
      applet_open_id : oY0le5V-YzahRYmY5PWMfo update=false
      ip_addr : update=false
      user_type : 1 update=false
      create_time : 2020-03-25 21:21:26 update=false
      update_time : update=false
      union_id : update=false
      public_open_id : update=false
      ------->; after
      id : 22 update=false
      nick_name : 。米哈    update=true
      phone : update=false
      head_img_url : https://wx.qlogo.cn/mmopen/Q/132 update=false
      gender : 2 update=false
      applet_open_id : oY0le5V-YzahRYmY5PWMfo update=false
      ip_addr : update=false
      user_type : 1 update=false
      create_time : 2020-03-25 21:21:26 update=false
      update_time : update=false
      union_id : update=false
      public_open_id : update=false



      文章转载自大飞智家,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论