
前言
在业务数据处理过程中,我们时常会遇到不同业务模块 存储系统间实时数据同步需求。比如, 报表模块依赖订单模块数据进行增量更新,检索引擎依赖业务数据进行实时同步等。针对这类场景,我们目前采用了Flink-CDC的技术方案用于数据同步。
Flink-CDC(CDC,全称是 Change Data Capture),是基于Apache Flink® 生态的数据源连接器,该连接器集成了Debezium引擎。其目标是为了用于监控和捕获数据变更,以便下游服务针对变更进行相应处理。基于CDC场景,比较成熟的解决方案还包括 Maxwell、Canal等 。


方案对比

以上是我们进行的解决方案对比,可以看到相较于Maxwell、Canal,Flink-CDC在数据源支持上更为丰富,同时基于Apache Flink®生态,在数据同步任务的处理过程中,还提供了诸如Map、Filter、Union的丰富算子支持。

原理
以Mysql为例,Flink-CDC的底层原理实际上是基于读取数据库的Binlog日志,同时内置集成的Debezium引擎,会将Binlog日志解析为行级变更的数据结构。目前Flink-CDC支持 DataStream-API SQL-API 两种实现进行CDC监控,以下我们主要以DataStream-API实现举例。
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;public class MySqlBinlogTask {public static void main(String[] args) throws Exception {# 定义数据源SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("host_name").port("port").databaseList("order_infos").tableList(["order_infos.order_info_0", "order_infos.order_info_2"]).username("user_name").password("user_pass").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction, "data_source_01");# 数据下游输出dataStreamSource.print();env.execute();}}

{# New Row Data"after":{"order_id":"mock_oid_01","shop_id":"mock_sid_01","order_status":"CANCELLED","payment_method":"Wallet V2","total_amount":"8881.000000","create_datetime":"2020-09-03T03:03:36Z","update_datetime":"2022-04-08T02:23:12Z","transaction_id":"a154111f857514b0"},# Metadata"source":{"version":"1.4.1.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":"1649384718000","db":"order_infos","table":"order_info_01","server_id":"225","gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002","file":"mysql-bin.000025","pos":"45950793","row":"0","thread":"27273"},"op":"c","ts_ms":"1649384718067"}
{# Old Row Data"before":{"order_id":"mock_oid_01","shop_id":"mock_sid_01","order_status":"SHIPPING","payment_method":"Wallet V2","total_amount":"8881.000000","create_datetime":"2020-09-03T03:03:36Z","update_datetime":"2022-04-08T02:23:12Z","transaction_id":"a154111f857514b0"},# New Row Data"after":{"order_id":"mock_oid_01","shop_id":"mock_sid_01","order_status":"CANCELLED","payment_method":"Wallet V2","total_amount":"8881.000000","create_datetime":"2020-09-03T03:03:36Z","update_datetime":"2022-04-08T02:23:12Z","transaction_id":"a154111f857514b0"},# Metadata"source":{"version":"1.4.1.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":"1649384718000","db":"order_infos","table":"order_info_01","server_id":"225","gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002","file":"mysql-bin.000025","pos":"45950793","row":"0","thread":"27273"},"op":"u","ts_ms":"1649384718067"}
{# Old Row Data"before":{"order_id":"mock_oid_01","shop_id":"mock_sid_01","order_status":"SHIPPING","payment_method":"Wallet V2","total_amount":"8881.000000","create_datetime":"2020-09-03T03:03:36Z","update_datetime":"2022-04-08T02:23:12Z","transaction_id":"a154111f857514b0"},# Metadata"source":{"version":"1.4.1.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":"1649384718000","db":"order_infos","table":"order_info_01","server_id":"225","gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002","file":"mysql-bin.000025","pos":"45950793","row":"0","thread":"27273"},"op":"d","ts_ms":"1649384718067"}
实践

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;public class MySqlBinlogTask {public static void main(String[] args) throws Exception {# 定义多个数据源SourceFunction<String> sourceFunction01 = initMySQLSource(1)SourceFunction<String> sourceFunction02 = initMySQLSource(2)SourceFunction<String> sourceFunction03 = initMySQLSource(3)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource01 = env.addSource(sourceFunction01, "data_source_01");DataStreamSource<String> dataStreamSource02 = env.addSource(sourceFunction02, "data_source_02");DataStreamSource<String> dataStreamSource03 = env.addSource(sourceFunction03, "data_source_03");# 多数据流合并DataStream<String> dataStreams = dataStreamSource01.union(dataStreamSource02, dataStreamSource03);# 数据下游输出dataStreams.print();env.execute();}}

# 过滤异常数据dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {if (value == null) {return false;}return true;}});# 数据转换dataStreamSource.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.trim();}});
增加addSink调用,配置数据流写入Kafka服务。

# Kafka配置String sinkTopicName = "order_infos_topic";Properties sinkProperties = new Properties();sinkProperties.setProperty("bootstrap.servers", bsServersSB.toString());# 数据流写入KafkadataStreamSource.addSink(new FlinkKafkaProducer<String>(sinkTopicName, new SimpleStringSchema(), sinkProperties)).name("write to kafka topic: " + sinkTopicName );
总结
参考资料


关于领创集团

文章转载自领创集团Advance Group,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。









