暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用canal偷取MySQL的二进制日志

码咖 2019-05-06
277

前提

安装完MySQL(我安装的是5.7),安装JDK(canal依赖)

开启MySQL的binlog

开启binlog,并且将binlog的格式改为Row,这样就可以获取到CURD的二进制内容。配置/etc/my.cnf,在[mysqld]增加

  1. log-bin=mysql-bin #添加这一行就ok

  2. binlog-format=ROW #选择row模式

  3. server_id=1 # 唯一,不能和其他集群MySQLserver_id一样

验证binlog是否开启

登录MySQL,使用命令:

  1. show variables like 'log_%'

若 log_bin显示为 on ,则说明开启。

给canal分配MySQL的账号

给canal分配一个MySQL的账号,方便canal偷取MySQL的binlog。

  1. CREATE USER canal IDENTIFIED BY 'canal';

  2. GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';

  3. FLUSH PRIVILEGES;

查看是否给canal账号分配权限

  1. show grants for 'canal'

下载解压canal

地址:https://github.com/alibaba/canal/releases ,目前稳定版是 v1.1.0,下载 canal.deployer-1.1.0.tar.gz。解压到 canal目录下(没有该目录 就新建)

注:canal 是纯Java写的,所有需要依赖JDK环境,我这边使用的是:1.8.0_65-b17

  1. # 下载

  2. wget https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.deployer-1.1.0.tar.gz

  3. # 创建canal目录

  4. mkdir canal

  5. # 解压

  6. tar -zxvf https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.deployer-1.1.0.tar.gz

canal和instance配置文件

一个canal里面可能会有多个instance,也就说一个instance可以监控一个mysql实例,多个instance也就可以对应多台服务器的mysql实例。也就是一个canal就可以监控分库分表下的多机器MySQL。

(1)canal.properties

canal/config 中的canal.properties文件,是全局性的canal服务器配置 ,修改内容如下:

  1. #################################################

  2. ######### common argument #############

  3. #################################################

  4. # id唯一,不可与mysqlserver_id重复

  5. canal.id= 2

  6. canal.ip=

  7. canal.port=11111

  8. canal.metrics.pull.port=11112

  9. canal.zkServers=

  10. # flush data to zk

  11. canal.zookeeper.flush.period = 1000

  12. canal.withoutNetty = false

  13. # flush meta cursor/parse position to file

  14. canal.file.data.dir = ${canal.conf.dir}

  15. canal.file.flush.period = 1000

  16. ## memory store RingBuffer size, should be Math.pow(2,n)

  17. canal.instance.memory.buffer.size = 16384

  18. ## memory store RingBuffer used memory unit size , default 1kb

  19. canal.instance.memory.buffer.memunit = 1024

  20. ## meory store gets mode used MEMSIZE or ITEMSIZE

  21. canal.instance.memory.batch.mode = MEMSIZE

  22. ## detecing config

  23. canal.instance.detecting.enable = false

  24. #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()

  25. canal.instance.detecting.sql = select 1

  26. canal.instance.detecting.interval.time = 3

  27. canal.instance.detecting.retry.threshold = 3

  28. canal.instance.detecting.heartbeatHaEnable = false

  29. # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery

  30. canal.instance.transaction.size = 1024

  31. # mysql fallback connected to new master should fallback times

  32. canal.instance.fallbackIntervalInSeconds = 60

  33. # network config

  34. canal.instance.network.receiveBufferSize = 16384

  35. canal.instance.network.sendBufferSize = 16384

  36. canal.instance.network.soTimeout = 30

  37. # binlog filter config

  38. canal.instance.filter.druid.ddl = true

  39. canal.instance.filter.query.dcl = false

  40. canal.instance.filter.query.dml = false

  41. canal.instance.filter.query.ddl = false

  42. canal.instance.filter.table.error = false

  43. canal.instance.filter.rows = false

  44. canal.instance.filter.transaction.entry = false

  45. # binlog format/image check

  46. canal.instance.binlog.format = ROW,STATEMENT,MIXED

  47. canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

  48. # binlog ddl isolation

  49. canal.instance.get.ddl.isolation = false

  50. # parallel parser config

  51. canal.instance.parser.parallel = true

  52. ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()

  53. # parallelThreadSize默认是注释掉的,原值为16,因为canal装在本地VM上,分配了1CPU,导致报错,改为1

  54. canal.instance.parser.parallelThreadSize = 1

  55. ## disruptor ringbuffer size, must be power of 2

  56. canal.instance.parser.parallelBufferSize = 256

  57. # table meta tsdb info

  58. canal.instance.tsdb.enable=true

  59. canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}

  60. canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;

  61. canal.instance.tsdb.dbUsername=canal

  62. canal.instance.tsdb.dbPassword=canal

  63. # rds oss binlog account

  64. canal.instance.rds.accesskey =

  65. canal.instance.rds.secretkey =

  66. #################################################

  67. ######### destinations #############

  68. #################################################

  69. canal.destinations= example

  70. # conf root dir

  71. canal.conf.dir = ../conf

  72. # auto scan instance dir add/remove and start/stop instance

  73. canal.auto.scan = true

  74. canal.auto.scan.interval = 5

  75. canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml

  76. #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml

  77. canal.instance.global.mode = spring

  78. canal.instance.global.lazy = false

  79. #canal.instance.global.manager.address = 127.0.0.1:1099

  80. #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml

  81. canal.instance.global.spring.xml = classpath:spring/file-instance.xml

  82. #canal.instance.global.spring.xml = classpath:spring/default-instance.xml

  83. # position info,需要改成自己的数据库信息

  84. canal.instance.master.address = 127.0.0.1:3306

  85. canal.instance.master.journal.name =

  86. canal.instance.master.position =

  87. canal.instance.master.timestamp =

  88. # username/password,需要改成自己的数据库信息

  89. canal.instance.dbUsername = canal

  90. canal.instance.dbPassword = canal

  91. canal.instance.defaultDatabaseName = test

  92. canal.instance.connectionCharset = UTF-8

  93. # table regex

  94. canal.instance.filter.regex = .*\\..*

(2)instance.properties

位于 canal/example/instance.properties,是具体的某个instances实例的配置,未涉及到的配置都会从canal.properties上继承,内容如下:

  1. #################################################

  2. ## mysql serverId , v1.0.26+ will autoGen

  3. # canal.instance.mysql.slaveId=0

  4. # enable gtid use true/false

  5. canal.instance.gtidon=false

  6. # position info address修改为自己的mysql地址

  7. canal.instance.master.address=192.168.204.128:3306

  8. canal.instance.master.journal.name=

  9. canal.instance.master.position=

  10. canal.instance.master.timestamp=

  11. canal.instance.master.gtid=

  12. # rds oss binlog

  13. canal.instance.rds.accesskey=

  14. canal.instance.rds.secretkey=

  15. canal.instance.rds.instanceId=

  16. # table meta tsdb info

  17. canal.instance.tsdb.enable=true

  18. #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

  19. #canal.instance.tsdb.dbUsername=canal

  20. #canal.instance.tsdb.dbPassword=canal

  21. #canal.instance.standby.address =

  22. #canal.instance.standby.journal.name =

  23. #canal.instance.standby.position =

  24. #canal.instance.standby.timestamp =

  25. #canal.instance.standby.gtid=

  26. # username/password 修改为在mysql中给canal同步数据的账号 密码

  27. canal.instance.dbUsername=canal

  28. canal.instance.dbPassword=canal

  29. # 监听的数据库

  30. canal.instance.defaultDatabaseName=test

  31. canal.instance.connectionCharset=UTF-8

  32. # table regex

  33. canal.instance.filter.regex=.*\\..*

  34. # table black regex

  35. canal.instance.filter.black.regex=

  36. #################################################

创建test数据库

查看MySQL上是否有test数据库,没有则创建

开启canal

进入canal/bin,执行:./startup.sh。

使用 ps -ef|grep canal 验证是否开启。

Java client代码

创建SpringBoot工程,引入依赖:

  1. <dependency>

  2. <groupId>com.alibaba.otter</groupId>

  3. <artifactId>canal.client</artifactId>

  4. <version>1.1.0</version>

  5. </dependency>

创建TestCanal类:

  1. package com.xbq.canal.test;

  2. import java.awt.Event;

  3. import java.net.InetSocketAddress;

  4. import java.util.List;

  5. import com.alibaba.otter.canal.client.CanalConnector;

  6. import com.alibaba.otter.canal.client.CanalConnectors;

  7. import com.alibaba.otter.canal.protocol.CanalEntry.Column;

  8. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

  9. import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

  10. import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

  11. import com.alibaba.otter.canal.protocol.CanalEntry.Header;

  12. import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

  13. import com.alibaba.otter.canal.protocol.Message;

  14. import com.google.protobuf.InvalidProtocolBufferException;

  15. /**

  16. * @Auther: xbq

  17. * @Date: 2018/9/11 19:16

  18. * @Description:

  19. */

  20. public class TestCanal {

  21. public static void main(String[] args) throws InterruptedException {

  22. // 第一步:与canal进行连接

  23. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.204.128", 11111),

  24. "example", "", "");

  25. connector.connect();

  26. // 第二步:开启订阅

  27. connector.subscribe();

  28. // 第三步:循环订阅

  29. while (true) {

  30. try {

  31. // 每次读取 1000 条

  32. Message message = connector.getWithoutAck(1000);

  33. long batchID = message.getId();

  34. int size = message.getEntries().size();

  35. if (batchID == -1 || size == 0) {

  36. System.out.println("当前暂时没有数据");

  37. Thread.sleep(1000);

  38. } else {

  39. System.out.println("-------------------------- 有数据啦 -----------------------");

  40. PrintEntry(message.getEntries());

  41. }

  42. // position id ack (方便处理下一条)

  43. connector.ack(batchID);

  44. } catch (Exception e) {

  45. // TODO: handle exception

  46. } finally {

  47. Thread.sleep(1000);

  48. }

  49. }

  50. }

  51. /**

  52. * 获取每条打印的记录

  53. * @param entrys

  54. */

  55. public static void PrintEntry(List<Entry> entrys) {

  56. for (Entry entry : entrys) {

  57. // 第一步:拆解entry 实体

  58. Header header = entry.getHeader();

  59. EntryType entryType = entry.getEntryType();

  60. // 第二步: 如果当前是RowData,那就是我需要的数据

  61. if (entryType == EntryType.ROWDATA) {

  62. String tableName = header.getTableName();

  63. String schemaName = header.getSchemaName();

  64. RowChange rowChange = null;

  65. try {

  66. rowChange = RowChange.parseFrom(entry.getStoreValue());

  67. } catch (InvalidProtocolBufferException e) {

  68. e.printStackTrace();

  69. }

  70. EventType eventType = rowChange.getEventType();

  71. System.out.println(String.format("当前正在操作 %s.%s, Action= %s", schemaName, tableName, eventType));

  72. // 如果是‘查询’ 或者 是 ‘DDL’ 操作,那么sql直接打出来

  73. if (eventType == EventType.QUERY || rowChange.getIsDdl()) {

  74. System.out.println("rowchange sql ----->" + rowChange.getSql());

  75. return;

  76. }

  77. // 第三步:追踪到 columns 级别

  78. rowChange.getRowDatasList().forEach((rowData) -> {

  79. // 获取更新之前的column情况

  80. List<Column> beforeColumns = rowData.getBeforeColumnsList();

  81. // 获取更新之后的 column 情况

  82. List<Column> afterColumns = rowData.getAfterColumnsList();

  83. // 当前执行的是 删除操作

  84. if (eventType == EventType.DELETE) {

  85. PrintColumn(beforeColumns);

  86. }

  87. // 当前执行的是 插入操作

  88. if (eventType == EventType.INSERT) {

  89. PrintColumn(afterColumns);

  90. }

  91. // 当前执行的是 更新操作

  92. if (eventType == EventType.UPDATE) {

  93. PrintColumn(afterColumns);

  94. }

  95. });

  96. }

  97. }

  98. }

  99. /**

  100. * 每个row上面的每一个column 的更改情况

  101. * @param columns

  102. */

  103. public static void PrintColumn(List<Column> columns) {

  104. columns.forEach((column) -> {

  105. String columnName = column.getName();

  106. String columnValue = column.getValue();

  107. String columnType = column.getMysqlType();

  108. // 判断 该字段是否更新

  109. boolean isUpdated = column.getUpdated();

  110. System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName,

  111. columnValue, columnType, isUpdated));

  112. });

  113. }

  114. }

运行此类。在MySQL test数据库中创建student表,对其进行增删改,可以发现控制台上打印:有数据库啦……

参考

缓存一致性和跨服务器查询的数据异构解决方案canal


文章转载自码咖,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论