暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

canal-adapter趟坑记录

一粒菜鸟 2020-02-28
971

前言



    canal-adapter目前支持rdb、es、kafka、hbase等多个目标端的同步,最近几篇我会记录一下我在使用adapter向这些目标端同步时,是如何解决es、hbase版本适配,添加部分个性化需求,以及如何处理一些我遇到各种问题。今天先来记录一下adapter同步mysql。




同步mysql



    application.yml


server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # kafka rocketMQ
  canalServerHost: 127.0.0.1:11111
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  username:
  password:
  vhost:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
      username: root
      password: 121212
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: rdb
      # 在所有目标端的同步中key是一个必须要配置的参数,不配置可能会出现数据丢失的问题
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
          jdbc.username: root
          jdbc.password: 121212






    conf/rdb下配置同步表信息


dataSourceKey: defaultDS
destination: example
groupId: ''
# 在所有目标端的同步中outerAdapterKey是一个必须要配置的参数,不配置可能会出现数据丢失的问题
# 对应application.yml中key字段
outerAdapterKey: mysql1
concurrent: false
dbMapping:
  commitBatch: 5000
  etlCondition: null
  mirrorDb: false
  readBatch: 5000
  database: mytest # 源数据源的database/shcema
  table: tb1 # 源数据源表名
  targetTable: mytest2.tb1 # 目标数据源的库名.表名
  targetPk: # 主键映射
    ID: ID # 如果是复合主键可以换行映射多个
  mapAll: true




问题



    在同步过程中,出现了insert成功,delete和update日志反馈成功,但实际数据没有同步的现象,查看同步源码





    com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService.java


for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
            
            //首先注意映射字段冒号前是目标字段名,冒号后是源端字段名
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            
            //如果未配置冒号后字段,将其处理成与冒号前一致
            if (srcColumnName == null) {
                srcColumnName = Util.cleanColumn(targetColumnName);
            }

            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
            if (type == null) {
                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
            }
            
            //主要问题出在这里
            Object value = data.get(srcColumnName);
            BatchExecutor.setValue(values, type, value);
        }


这段代码,能够看出,同步过程中更新的value值是存储在data这个Map结构中的,所以这就要求srcColumnName这个字段的值要与源端数据表的字段名大小写完全一致(即使数据库本身对于字段大小写是不敏感的)




举例




源端tb有ID、id1、Id2三列


目标端tb2有id、ID1、id2三列


其实在数据查询过程中,由于数据库对字段大小写不敏感,所有无论怎么写都是没问题的,但在配置同步表过程中,他们的表现就有所不同


targetPk: # 主键映射
    ID: ID
    
targetPk: # 主键映射
 ID:
  
targetPk: # 主键映射
 id: ID
    
#以上几种写法,最后解析出来的srcColumnName 均为ID,所以可以从有binlog解析出来的数据转换得到的源端Map类型的data中获取到数据



targetPk: # 主键映射
  id:
#类似这种写法,解析出的srcColumnName 均为id,
#而Map类型中data中只包含ID这个key,而不包含id这个key,
#所以就取不到相应的值,导致需要主键操作的update和delete语句执行无效


这是我在同步rdb时遇到的需要注意的地方,当然也可以根据需要将这部分代码改造一下,这里就不作描述了,代码具体位置已给出。



 往期推荐 

🔗

k8s集群下搭建数据同步工具-canal:canal-admin篇

k8s集群下搭建数据同步工具-canal:canal-server篇

k8s集群下搭建数据同步工具-canal:canal-adapter篇

k8s集群下canal-server的伪高可用

k8s集群下canal-adapter连接canal-server域名问题改造

多个canal-server集群共用一套zookeeper解决方案



一粒菜鸟


程序猿的非硬核技术文档和崩溃日常


文章转载自一粒菜鸟,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论