暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

基于Flink-CDC数据同步⽅案

领创集团Advance Group 2022-04-08
1640

前言

在业务数据处理过程中,我们时常会遇到不同业务模块 存储系统间实时数据同步需求。比如, 报表模块依赖订单模块数据进行增量更新,检索引擎依赖业务数据进行实时同步等。针对这类场景,我们目前采用了Flink-CDC的技术方案用于数据同步。

Flink-CDC(CDC,全称是 Change Data Capture),是基于Apache Flink® 生态的数据源连接器,该连接器集成了Debezium引擎。其目标是为了用于监控和捕获数据变更,以便下游服务针对变更进行相应处理。基于CDC场景,比较成熟的解决方案还包括 Maxwell、Canal等 。



方案对比

Flink - CDC(Debezium)/ Maxwell Canal

以上是我们进行的解决方案对比,可以看到相较于Maxwell、Canal,Flink-CDC在数据源支持上更为丰富,同时基于Apache Flink®生态,在数据同步任务的处理过程中,还提供了诸如Map、Filter、Union的丰富算子支持。

Flink-CDC ⽀持数据源、数据下游⽀持:



原理

以Mysql为例,Flink-CDC的底层原理实际上是基于读取数据库的Binlog日志,同时内置集成的Debezium引擎,会将Binlog日志解析为行级变更的数据结构。目前Flink-CDC支持 DataStream-API SQL-API 两种实现进行CDC监控,以下我们主要以DataStream-API实现举例。

3.1 配置代码
DataStream API实现:
    import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
    import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;


    public class MySqlBinlogTask {


    public static void main(String[] args) throws Exception {
    # 定义数据源
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
    .hostname("host_name")
    .port("port")
    .databaseList("order_infos")
    .tableList(["order_infos.order_info_0", "order_infos.order_info_2"])
    .username("user_name")
    .password("user_pass")
    .startupOptions(StartupOptions.initial())
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction, "data_source_01");
    # 数据下游输出
    dataStreamSource.print();
    env.execute();
    }
    }


    到这⾥Flink-CDC的基础配置已经完成,启动后如果order_infos.order_info_0数据变更,那么程序就 可以监听到对应的变更信息了。⽰例代码的下游配置为标准输出流,实际在线上业务场景,可通过dataStreamSource.addSink()的⽅式对接下游服务,⽐如Kafka RabbitMq Elasticsearch等。
    3.2 数据结构
    基本操作 Debezium 格式化后的数据结构,包含 before、after、source、op、ts_ms 5个字段,含义 如下:

    INSERT⽰例:
      {
      # New Row Data
      "after":{
      "order_id":"mock_oid_01",
      "shop_id":"mock_sid_01",
      "order_status":"CANCELLED",
      "payment_method":"Wallet V2",
      "total_amount":"8881.000000",
      "create_datetime":"2020-09-03T03:03:36Z",
      "update_datetime":"2022-04-08T02:23:12Z",
      "transaction_id":"a154111f857514b0"
      },
      # Metadata
      "source":{
      "version":"1.4.1.Final",
      "connector":"mysql",
      "name":"mysql_binlog_source",
      "ts_ms":"1649384718000",
      "db":"order_infos",
      "table":"order_info_01",
      "server_id":"225",
      "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
      "file":"mysql-bin.000025",
      "pos":"45950793",
      "row":"0",
      "thread":"27273"
      },
      "op":"c",
      "ts_ms":"1649384718067"
      }
      UPDATE⽰例:
        {
        # Old Row Data
        "before":{
        "order_id":"mock_oid_01",
        "shop_id":"mock_sid_01",
        "order_status":"SHIPPING",
        "payment_method":"Wallet V2",
        "total_amount":"8881.000000",
        "create_datetime":"2020-09-03T03:03:36Z",
        "update_datetime":"2022-04-08T02:23:12Z",
        "transaction_id":"a154111f857514b0"
        },
        # New Row Data
        "after":{
        "order_id":"mock_oid_01",
        "shop_id":"mock_sid_01",
        "order_status":"CANCELLED",
        "payment_method":"Wallet V2",
        "total_amount":"8881.000000",
        "create_datetime":"2020-09-03T03:03:36Z",
        "update_datetime":"2022-04-08T02:23:12Z",
        "transaction_id":"a154111f857514b0"
        },
        # Metadata
        "source":{
        "version":"1.4.1.Final",
        "connector":"mysql",
        "name":"mysql_binlog_source",
        "ts_ms":"1649384718000",
        "db":"order_infos",
        "table":"order_info_01",
        "server_id":"225",
        "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
        "file":"mysql-bin.000025",
        "pos":"45950793",
        "row":"0",
        "thread":"27273"
        },
        "op":"u",
        "ts_ms":"1649384718067"
        }

        DELETE⽰例:
          {
          # Old Row Data
          "before":{
          "order_id":"mock_oid_01",
          "shop_id":"mock_sid_01",
          "order_status":"SHIPPING",
          "payment_method":"Wallet V2",
          "total_amount":"8881.000000",
          "create_datetime":"2020-09-03T03:03:36Z",
          "update_datetime":"2022-04-08T02:23:12Z",
          "transaction_id":"a154111f857514b0"
          },
          # Metadata
          "source":{
          "version":"1.4.1.Final",
          "connector":"mysql",
          "name":"mysql_binlog_source",
          "ts_ms":"1649384718000",
          "db":"order_infos",
          "table":"order_info_01",
          "server_id":"225",
          "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
          "file":"mysql-bin.000025",
          "pos":"45950793",
          "row":"0",
          "thread":"27273"
          },
          "op":"d",
          "ts_ms":"1649384718067"
          }


          实践

          4.1多数据源
          在多个数据源同步场景下,Flink提供了union算⼦⽅便进⾏多数据流的合并。
          拓扑结构:

          示例代码:
            import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
            import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
            import org.apache.flink.streaming.api.datastream.DataStreamSource;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import org.apache.flink.streaming.api.functions.source.SourceFunction;


            public class MySqlBinlogTask {


            public static void main(String[] args) throws Exception {
            # 定义多个数据源
            SourceFunction<String> sourceFunction01 = initMySQLSource(1)
            SourceFunction<String> sourceFunction02 = initMySQLSource(2)
            SourceFunction<String> sourceFunction03 = initMySQLSource(3)
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStreamSource01 = env.addSource(sourceFunction01, "data_source_01");
            DataStreamSource<String> dataStreamSource02 = env.addSource(sourceFunction02, "data_source_02");
            DataStreamSource<String> dataStreamSource03 = env.addSource(sourceFunction03, "data_source_03");
            # 多数据流合并
            DataStream<String> dataStreams = dataStreamSource01.union(dataStreamSource02, dataStreamSource03);
            # 数据下游输出
            dataStreams.print();
            env.execute();
            }
            }

            4.2数据过滤&转换
            增加Filter算子进行异常数据过滤、增加Map算子进行数据格式转换。
            拓扑结构:
            示例代码:
              # 过滤异常数据
              dataStreamSource.filter(new FilterFunction<String>() {
              @Override
              public boolean filter(String value) throws Exception {
              if (value == null) {
              return false;
              }
              return true;
              }
              });
              # 数据转换
              dataStreamSource.map(new MapFunction<String, String>() {
              @Override
              public String map(String value) throws Exception {
              return value.trim();
              }
              });
              4.3数据流写入Kafka

              增加addSink调用,配置数据流写入Kafka服务。

              拓扑结构:
              示例代码:
                # Kafka配置
                String sinkTopicName = "order_infos_topic";
                Properties sinkProperties = new Properties();
                sinkProperties.setProperty("bootstrap.servers", bsServersSB.toString());


                # 数据流写入Kafka
                dataStreamSource.addSink(new FlinkKafkaProducer<String>(
                sinkTopicName, new SimpleStringSchema(), sinkProperties))
                .name("write to kafka topic: " + sinkTopicName );


                总结

                到这里一个基本的Flink-CDC的数据同步逻辑就实现了。Flink-CDC方案,目前已落地生产环境并得到有效验证,日均千万级的数据同步,业务检索系统可达到秒级同步,报表数据可达到分钟级同步。
                当然,这其中也包含了基于生产环境更多因素优化。比如Flink任务基于窗口的数据合并,任务并行度配置等。
                后续,随着业务数据的增长,数据同步仍然会面临很多挑战,我们会持续优化并完善数据同步方案,也欢迎对数据同步 ETL感兴趣的同学,可以提出您的建议共同学习交流。


                参考资料

                https://github.com/ververica/flink-cdc-connectors
                https://github.com/zendesk/maxwell
                https://github.com/alibaba/canal
                https://debezium.io/

                感谢阅读「技术创想」第42期文章
                领创集团正在春季招聘中
                期待你的加入
                点击文末
                阅读原文
                获取更多
                招聘信息

                关于领创集团

                (Advance Intelligence Group)
                领创集团成立于 2016年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。2021年 9月,领创集团宣布完成超4亿美元 D 轮融资,融资完成后领创集团估值已超 20亿美元,成为新加坡最大的独立科技创业公司之一。
                文章转载自领创集团Advance Group,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论