暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SeaTunnel 即将支持 CDC 功能,设计思路曝光!

SeaTunnel 2022-11-22
3473

 点亮 ⭐️ Star · 照亮开源之路

https://github.com/apache/incubator-seatunnel


作者 | 李宗文,SeaTunnel Committer

SeaTunnel CDC 支持的背景


变更数据捕获 (CDC) 是指识别和捕获对数据库中的数据所做的更改,再将这些更改实时传送到下游流程或系统的过程。


CDC 的实现主要通过基于查询和基于 Binlog 两种方式。我们知道 MySQL 有 binlog(二进制日志)来记录用户对数据库的更改,因此,使用 binlog 来完成 CDC 实现是最简单、最高效、合乎逻辑的方法之一。当然,已经有许多开箱即用的开源 MySQL CDC 实现方式,使用 binlog 并不是实现 CDC 的唯一方法(至少对于 MySQL 而言),即使是数据库触发器也可以执行类似的功能,但在效率和对数据库的影响方面可能会相形见绌。

通常情况下,CDC 捕获到数据库的更改后,会将更改事件发布到消息队列中供消费者使用,例如 Debezium,它可以持续将 MySQL(也支持 PostgreSQL、Mongo 等)的变化发送给 Kafka,通过订阅 Kafka 中的事件,我们可以获取更改的内容并实现我们需要的功能。

我认为 CDC 对数据同步至关重要,SeaTunnel 需要将它作为一个重要的功能特性。以下是 SeaTunnel 需要这个功能的原因,以及我对 SeaTunnel 支持 CDC 的整体设计。希望可以听听大家的意见,用最佳的方式在 SeaTunnel 中进行实现。

动机


  1. 支持并行读取历史数据(快速同步,亿级大表)
  2. 支持读取增量数据(CDC)
  3. 支持心跳检测(metrics、小流量表)
  4. 支持动态添加新表(更易于操作和维护)

整体设计






CDC 的基本流程包括:
1. 快照阶段:读取表的历史数据
  • 最小拆分粒度:表的主键范围数据
2. 增量阶段:读取表的增量日志变化数据
  • 最小拆分粒度:一张表

快照阶段

DS



枚举器生成一个表的多个 SnapshotSplit,并将它们分配给 reader。

    /  pseudo-code. 
    public class SnapshotSplit implements SourceSplit {
    private final String splitId;
    private final TableId tableId;
    private final SeaTunnelRowType splitKeyType;
    private final Object splitStart;
    private final Object splitEnd;
    }
    当 SnapshotSplit 读取完成时,读取器将拆分的高水位标记报告给枚举器。当所有 SnapshotSplit 报告高水位线时,枚举器开始增量阶段。

      //  pseudo-code. 
      public class CompletedSnapshotSplitReportEvent implements SourceEvent {
      private final String splitId;
      private final Offset highWatermark;
      }

      快照阶段 - SnapshotSplit 读取流程

      有4个步骤:

      • 日志低水位线:读取快照数据前获取当前日志偏移量。
      • Read SnapshotSplit data:读取属于split 的数据范围
        • 案例1:步骤1&2不能原子化(MySQL)
      因为我们不能加表锁,也不能加基于低水位的区间锁,所以第 1 步和第 2 步不是孤立的。

      • exactly-once:使用内存表保存历史数据 & 过滤日志数据从低水位线到高水位线
      • 至少一次:直接输出数据并使用低水印而不是高水印
      • 案例 2:步骤 1 和 2 可以原子化(Oracle)
      • 可以使用 for scn 来保证两步的原子化

      • Exactly-Once:直接输出数据并使用低位线而不是高位线
      3. 登录高位线:

      • step 2 案例 1 & Exactly-Once:读取快照数据后获取当前日志偏移量。
      • 其他:使用低位线代替高位线
      4.如果高位线>低位线,读取范围日志数据

      快照阶段—MySQL Snapshot Read & Exactly-once


      因为我们无法确定查询语句在高低水位之间执行的位置,为了保证数据的 exactly-once,我们需要使用内存表来临时保存数据。

      1. 日志低水位线:读取快照数据前获取当前日志偏移量。
      2. 读取 SnapshotSplit 数据:读取属于 split 的范围数据,写入内存表。
      3. 日志高水位线:读取快照数据后获取当前日志偏移量。
      4. 读取范围日志数据:读取日志数据并写入内存表
      5. 输出内存表的数据,释放内存使用量。

      增量阶段

      DS


      当所有快照拆分报告水位时,开始增量阶段。

      结合所有快照拆分和水位信息,获得 LogSplits。

      我们希望最小化日志连接的数量:

      • 增量阶段默认只有一个 reader 工作,用户也可以配置选项指定数量(不能超过 reader 数量)
      • 一个 reader 最多获得一个连接
        //  pseudo-code. 
        public class LogSplit implements SourceSplit {
        private final String splitId;
        /**
        * All the tables that this log split needs to capture.
        */
        private final List<TableId> tableIds;
        /**
        * Minimum watermark for SnapshotSplits for all tables in this LogSplit
        */
        private final Offset startingOffset;
        /**
        * Obtained by configuration, may not end
        */
        private final Offset endingOffset;
        /**
        * SnapshotSplit information for all tables in this LogSplit.
        * </br> Used to support Exactly-Once.
        */
        private final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos;
        /**
        * Maximum watermark in SnapshotSplits per table.
        * </br> Used to delete information in completedSnapshotSplitInfos, reducing state size.
        * </br> Used to support Exactly-Once.
        */
        private final Map<TableId, Offset> tableWatermarks;
        }
        // pseudo-code.
        public class CompletedSnapshotSplitInfo implements Serializable {
        private final String splitId;
        private final TableId tableId;
        private final SeaTunnelRowType splitKeyType;
        private final Object splitStart;
        private final Object splitEnd;
        private final Offset watermark;
        }

        Exactly-Once:

        • 阶段 1:在水印数据之前使用 completedSnapshotSplitInfos 过滤器。
        • 阶段2:表不再需要过滤,在 completedSnapshotSplitInfos 中删除属于该表的数据,因为后面的数据需要处理。
        At-Least-Once:无需过滤数据,且 completedSnapshotSplitInfos 不需要任何数据。

        动态发现新表

        DS


        场景 1:发现新表时,枚举器处于快照阶段,直接分配新的 split。

        场景 2:发现新表时,枚举器处于递增阶段。

        在增量阶段动态发现新表。

        1. 暂停 LogSplit reader。(如果有空闲的 reader,需要暂停吗?)
        2. Reader 暂停运行。
        3. Reader 报告当前日志偏移量。(如不上报,reader 需支持 LogSplit 组合)
        4. 将 SnapshotSplit 分配给阅读器。
        5. Reader 执行快照阶段读取。
        6. Reader 报告所有 SnapshotSplit 水印。
        7. 为 Reader 分配一个新的 LogSplit。
        8. Reader 再次开始增量阅读并向枚举器确认。

        多个结构化表格

        DS

        • 优点:占用数据库连接少,减少数据库压力
        • 缺点:在 SeaTunnel 引擎中,多个表会在一个管道中,容错的粒度会变大。


        这个特性允许源支持读取多个结构表,再使用侧流输出与单表流保持一致。

        此外,由于这会涉及对 DAG 和转换模块的更改,我还希望可以支持定义分区器(HASH 和FORWARD分区器)。

        本文里提到的部分功能已经实现,详情见#2490 https://github.com/apache/incubator-seatunnel/issues/2490

        你对这个架构有什么看法?或者如果您有兴趣与我们的团队一起构建它,欢迎在问题下发表评论以加入我们:https://github.com/apache/incubator-seatunnel/issues/2394


        Apache SeaTunnel


        Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

        仓库地址: 
        https://github.com/apache/incubator-seatunnel

        网址:
        https://seatunnel.apache.org/

        Proposal:
        https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro

        Apache SeaTunnel(Incubating)  下载地址:
        https://seatunnel.apache.org/download
         
        衷心欢迎更多人加入!

        我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

        我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

        提交问题和建议:
        https://github.com/apache/incubator-seatunnel/issues

        贡献代码:
        https://github.com/apache/incubator-seatunnel/pulls

        订阅社区开发邮件列表 : 
        dev-subscribe@seatunnel.apache.org

        开发邮件列表:
        dev@seatunnel.apache.org

        加入 Slack:
        https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

        关注 Twitter: 
        https://twitter.com/ASFSeaTunnel

        往期推荐




        为什么我们要自研全球首款大数据同步引擎 SeaTunnel Engine?




        SeaTunnel Connector 接入激励计划第二期“战报”来了!




        现代数据堆栈下的 Apache 项目



        点击“阅读原文”参与共建!

        文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论