暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

canal修改同步位点解析,附canal全流程趟坑实践!

一粒菜鸟 2021-09-27
4126


前言



    在canal的数据同步使用过程中,有时会遇到需要修改同步位点的情况,这里对修改位点操作做一下记录



分析



    我们知道在canal-server的instance的配置文件中有一下配置项是与位点修改相关的


canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=


    第一次修改位点时,我想当然的以为只需要修改这部分配置然后instance自动重启就可以了,但是在修改配置之后,发现并没有像想象中那样按照我设置的时间点去同步消息。


    查看资料和代码发现,同步位点的确定逻辑是当历史记录位点的信息不存在时,才会使用配置文件中的位点信息去解析binlog,相关代码位置


  • com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.java


protected EntryPosition findStartPositionInternal(ErosaConnection connection{
        MysqlConnection mysqlConnection = (MysqlConnection) connection;
        LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
        if (logPosition == null) {// 找不到历史成功记录
            EntryPosition entryPosition = null;
            if (masterInfo != null && mysqlConnection.getConnector().getAddress().equals(masterInfo.getAddress())) {
                entryPosition = masterPosition;
            } else if (standbyInfo != null
                       && mysqlConnection.getConnector().getAddress().equals(standbyInfo.getAddress())) {
                entryPosition = standbyPosition;
            }

            if (entryPosition == null) {
                entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
            }

            // 判断一下是否需要按时间订阅
            if (StringUtils.isEmpty(entryPosition.getJournalName())) {
                // 如果没有指定binlogName,尝试按照timestamp进行查找
                if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
                    logger.warn("prepare to find start position {}:{}:{}",
                        new Object[] { """", entryPosition.getTimestamp() });
                    return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
                } else {
                    logger.warn("prepare to find start position just show master status");
                    return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
                }
            } else {
                if (entryPosition.getPosition() != null && entryPosition.getPosition() > 0L) {
                    // 如果指定binlogName + offest,直接返回
                    entryPosition = findPositionWithMasterIdAndTimestamp(mysqlConnection, entryPosition);
                    logger.warn("prepare to find start position {}:{}:{}",
                        new Object[] { entryPosition.getJournalName(), entryPosition.getPosition(),
                                entryPosition.getTimestamp() });
                    return entryPosition;
                } else {
                    EntryPosition specificLogFilePosition = null;
                    if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
                        // 如果指定binlogName +
                        // timestamp,但没有指定对应的offest,尝试根据时间找一下offest
                        EntryPosition endPosition = findEndPosition(mysqlConnection);
                        if (endPosition != null) {
                            logger.warn("prepare to find start position {}:{}:{}",
                                new Object[] { entryPosition.getJournalName(), "", entryPosition.getTimestamp() });
                            specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
                                entryPosition.getTimestamp(),
                                endPosition,
                                entryPosition.getJournalName(),
                                true);
                        }
                    }

                    if (specificLogFilePosition == null) {
                        // position不存在,从文件头开始
                        entryPosition.setPosition(BINLOG_START_OFFEST);
                        return entryPosition;
                    } else {
                        return specificLogFilePosition;
                    }
                }
            }
        } else {
            if (logPosition.getIdentity().getSourceAddress().equals(mysqlConnection.getConnector().getAddress())) {
                if (dumpErrorCountThreshold >= 0 && dumpErrorCount > dumpErrorCountThreshold) {
                    // binlog定位位点失败,可能有两个原因:
                    // 1. binlog位点被删除
                    // 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点
                    boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
                                    && logPosition.getPostion().getServerId() != null
                                    && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
                    if (case2) {
                        long timestamp = logPosition.getPostion().getTimestamp();
                        long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
                        logger.warn("prepare to find start position by last position {}:{}:{}"new Object[] { """",
                                logPosition.getPostion().getTimestamp() });
                        EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
                        // 重新置为一下
                        dumpErrorCount = 0;
                        return findPosition;
                    }

                    Long timestamp = logPosition.getPostion().getTimestamp();
                    if (isRdsOssMode() && (timestamp != null && timestamp > 0)) {
                        // 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
                        return null;
                    }
                }
                // 其余情况
                logger.warn("prepare to find start position just last position\n {}",
                    JsonUtils.marshalToString(logPosition));
                return logPosition.getPostion();
            } else {
                // 针对切换的情况,考虑回退时间
                long newStartTimestamp = logPosition.getPostion().getTimestamp() - fallbackIntervalInSeconds * 1000;
                logger.warn("prepare to find start position by switch {}:{}:{}"new Object[] { """",
                        logPosition.getPostion().getTimestamp() });
                return findByStartTimeStamp(mysqlConnection, newStartTimestamp);
            }
        }
    }




操作



    了解了位点信息的处理逻辑,修改位点信息就简单了

    

    1、停止instance的同步任务

    

    2、删除相应的位点记录信息

  • canal-server以文件形式记录位点

# canal-sever的配置文件application.yml配置如下
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

    

    使用这种方式记录位点信息,也就是单机模式启动时,我们需要将该instance下的meta.dat文件删除(默认位置是cconf/instance目录下)


  • canal-server以集群模式启动将位点注册在zk中


# canal-sever的配置文件application.yml配置如下
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

 集群模式启动的server,instance位点信息在/otter/canal/destinations/instance/1001/cursor节点中存储,将其删除即可

  

    3、修改instance配置文件中的位点时间,我一般习惯改timestamp


canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=


    4、启动修改的instance即可






希望这篇内容能够对你有所帮助,顺便求一波转发、在看!





 canal实践总集 

🔗

k8s集群下搭建数据同步工具-canal:canal-admin篇

k8s集群下搭建数据同步工具-canal:canal-server篇

k8s集群下搭建数据同步工具-canal:canal-adapter篇

k8s集群下canal-server的伪高可用

k8s集群下canal-adapter连接canal-server域名问题改造

多个canal-server集群共用一套zookeeper解决方案

canal-adapter趟坑记录

canal-adapter:同步hbase版本适配

canal-adapter适配elasticsearch2.x和5.x版本

canal-server适配SASLPLAIN方式鉴权kafka实践




一粒菜鸟


程序猿非硬核技术文档和崩溃日常



我就知道你“在看”


文章转载自一粒菜鸟,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论