前几天flink cdc 发布了2.0版本,于是跟进了一下。2.0版本主要解决1.x里的几个重要问题,如下:
全量 + 增量读取的过程需要保证所有数据的一致性,因此需要通过加锁保证,但是加锁在数据库层面上是一个十分高危的操作。底层 Debezium 在保证数据一致性时,需要对读取的库或表加锁,全局锁可能导致数据库锁住,表级锁会锁住表的读,DBA 一般不给锁权限。
不支持水平扩展,因为 Flink CDC 底层是基于 Debezium,起架构是单节点,所以Flink CDC 只支持单并发。在全量阶段读取阶段,如果表非常大 (亿级别),读取时间在小时甚至天级别,用户不能通过增加资源去提升作业速度。
全量读取阶段不支持 checkpoint:CDC 读取分为两个阶段,全量读取和增量读取,目前全量读取阶段是不支持 checkpoint 的,因此会存在一个问题:当我们同步全量数据时,假设需要 5 个小时,当我们同步了 4 小时的时候作业失败,这时候就需要重新开始,再读取 5 个小时。
其中第一个加锁问题尤为突出,基本在公司生产环境不可用,因为生产环境不可能让你去加全局锁的。cdc2.0 借鉴了netflix发布的DBlog paper来解决加锁问题(参考论文 https://arxiv.org/pdf/2010.12597v1.pdf ) 其核心思想是对表按照主建进行分片,记为Chunk。在对每一个Chunk进行并行读取,最后对所有Chunk进行对齐处理,得到最终的增量读取点,在继续读取。具体详细算法这里不展开了,感兴趣的可以参考这篇文章(https://mp.weixin.qq.com/s/iwY5975XXp7QOBeV0q4TfQ) (2),(3) 两个问题,通过FLIP-27中新的source接口来解决了,可以参考设计文档(https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface )
一个例子
注意:(1)必须开启checkpoint (2)source表必须定义主键
mysql建表语句
CREATE TABLE `orders` (
`order_id` int(11) NOT NULL AUTO_INCREMENT,
`order_date` datetime NOT NULL,
`customer_name` varchar(255) NOT NULL,
`price` decimal(10,5) NOT NULL,
`product_id` int(11) NOT NULL,
`order_status` tinyint(1) NOT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB AUTO_INCREMENT=10006 DEFAULT CHARSET=utf8mb4
初始化数据
INSERT INTO `orders` VALUES (10001,'2020-07-30 10:08:22','Jark',1.00000,102,1),(10002,'2020-07-30 10:11:09','Sally',1.00000,105,1),(10003,'2020-07-30 12:00:30','Edward',1.00000,106,1),(10004,'2020-07-30 15:22:00','Jark',2.00000,104,1),(10005,'2020-07-30 15:22:00','Herden',1.00000,104,1);
flink代码
try {
EnvironmentSettings mySetting = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
//开启checkpoint
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, mySetting);
String ddl1 = ""
+ "CREATE TABLE orders (\n"
+ " order_id INT primary key not enforced,\n"
+ " order_date TIMESTAMP(0),\n"
+ " customer_name STRING,\n"
+ " price DECIMAL(10, 5),\n"
+ " product_id INT,\n"
+ " order_status BOOLEAN\n"
+ ") WITH (\n"
+ " 'connector' = 'mysql-cdc',\n"
+ " 'hostname' = 'localhost',\n"
+ " 'port' = '3306',\n"
+ " 'username' = 'root',\n"
+ " 'password' = 'root',\n"
+ " 'database-name' = 'flink_sql',\n"
+ " 'table-name' = 'orders'\n"
+ ")";
tableEnv.executeSql(ddl1);
String ddlPrint = ""
+ "CREATE TABLE sink_print (\n"
+ " order_id INT primary key not enforced,\n"
+ " order_date TIMESTAMP(0),\n"
+ " customer_name STRING,\n"
+ " price DECIMAL(10, 5),\n"
+ " product_id INT,\n"
+ " order_status BOOLEAN\n"
+ ") WITH (\n"
+ " 'connector' = 'print'\n"
+ ")";
tableEnv.executeSql(ddlPrint);
StatementSet stmtSet = tableEnv.createStatementSet();
stmtSet.addInsertSql(""
+ "INSERT INTO sink_print\n"
+ "SELECT * \n"
+ "FROM orders\n");
stmtSet.execute();
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.toString());
System.err.println("任务执行失败:" + e.getMessage());
System.exit(-1);
}
在实际测试中发现几个注意点:
从snapshot stage 切换到increment binlog stage 的时候,需要在all snapshot 完成后,下一个checkpoint发生时,才开启后续的increment binlog stage。所有实际测试数据变更后,需要等一下才能观察到增量的改变。 increment binlog stage 的binlog position总是(binlog_name=“”,position=0),这里貌似是计算minStartPosition位点的时候的一个bug,我已经提issue讨论了,见:https://github.com/ververica/flink-cdc-connectors/issues/329 欢迎一起讨论~~~




