概述
Flink CDC Connectors是一个用于从不同数据库用CDC整合changes信息的source connectors集合,实现基于Debezium引擎,能够完全利用Debezium的能力。Debezium详见官网:https://debezium.io/

支持的(已测试的)数据库信息

功能特性
1. 支持以exactly-once语义读取数据库snapshot和binlogs
2. 对于DataStream API,用户可以在一个job中消费多库多表changes信息,无需额外部署Debezium和Kafka
3. 对于Table/SQL API,用户可以使用SQL DDL创建表消费单表changes信息
项目地址
github: https://github.com/ververica/flink-cdc-connectors
案例说明
1). 无论是使用DataStream API、Table/SQL API、SQLClient,均需要添加如下依赖
<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.1.0</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId> flink-connector-postgres-cdc</artifactId><version>1.1.0</version></dependency>
2). 前置条件:需要提前安装mysql和postgresql,注意使用上面提到的版本
案例实战
1). mysql创建库表、插入数据
mysql> CREATE DATABASE db_felixzh_cdc;Query OK, 1 row affected (0.00 sec)mysql> CREATE TABLE tb_products_cdc( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(64), description VARCHAR(128) );Query OK, 0 rows affected (0.01 sec)mysql> INSERT INTO tb_products_cdc VALUES (DEFAULT, 'zhangsan', 'aaa'), (DEFAULT, 'lisi', 'bbb'), (DEFAULT, 'wangwu', 'ccc');Query OK, 3 rows affected (0.01 sec)Records: 3 Duplicates: 0 Warnings: 0
2). mysql cdc代码main方法:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setRestartStrategy(RestartStrategies.noRestart());StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);String sourceDDL ="CREATE TABLE mysql_binlog (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'felixzh',\n" +" 'port' = '3306',\n" +" 'username' = 'root',\n" +" 'password' = 'passwd',\n" +" 'database-name' = 'db_felixzh_cdc',\n" +" 'table-name' = 'tb_products_cdc'\n" +")";String sinkDDL ="CREATE TABLE tb_sink (\n" +" id INT,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'print'\n" +")";String transformSQL ="INSERT INTO tb_sink " +"SELECT * " +"FROM mysql_binlog ";tableEnv.executeSql(sourceDDL).print();tableEnv.executeSql(sinkDDL).print();tableEnv.executeSql(transformSQL).print();}
3). 完整源码github地址:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/format/src/main/java/com/felixzh/learning/flink/format/debezium_json/MySqlCDC2Print.java
4). IDE(idea)运行MySqlCDC2Print结果
+--------+| result |+--------+| OK |+--------+1 row in set+--------+| result |+--------+0 row in set14:50:02,488 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set.14:50:02,495 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.+------------------------------------------+| default_catalog.default_database.tb_sink |+------------------------------------------+| -1 |+------------------------------------------+1 row in set14:50:04,432 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.14:50:04,688 WARN org.apache.flink.metrics.MetricGroup - The operator name Sink: Sink(table=[default_catalog.default_database.tb_sink], fields=[id, name, description]) exceeded the 80 characters length limit and was truncated.14:50:04,711 WARN org.apache.flink.metrics.MetricGroup - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, mysql_binlog]], fields=[id, name, description]) exceeded the 80 characters length limit and was truncated.+I(1,zhangsan,aaa)+I(2,wangwu,bbb)+I(3,wangwu,ccc)+I(5,lisi,ddd)
5). postgresql创建库表,插入数据
postgres=# CREATE DATABASE db_felixzh_cdc;CREATE DATABASEpostgres=# \c db_felixzh_cdc;SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off)You are now connected to database "db_felixzh_cdc" as user "postgres".db_felixzh_cdc=# CREATE TABLE tb_products_cdc (id INT NOT NULL PRIMARY KEY, name VARCHAR(255),description VARCHAR(512));CREATE TABLEdb_felixzh_cdc=# INSERT INTO tb_products_cdc VALUES (1,'scooter','Small 2-wheel scooter');INSERT 0 1
6). postgressql cdc代码main方法:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setRestartStrategy(RestartStrategies.noRestart());StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);String sourceDDL ="CREATE TABLE postgresql_log (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'postgres-cdc',\n" +" 'hostname' = 'felixzh',\n" +" 'port' = '5432',\n" +" 'username' = 'postgres',\n" +" 'password' = 'passwd',\n" +" 'database-name' = 'db_felixzh_cdc',\n" +" 'schema-name' = 'db_felixzh_cdc',\n" +" 'table-name' = 'tb_products_cdc'\n" +")";String sinkDDL ="CREATE TABLE tb_sink (\n" +" id INT,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'print'\n" +")";String transformSQL ="INSERT INTO tb_sink " +"SELECT * " +"FROM postgresql_log ";tableEnv.executeSql(sourceDDL).print();tableEnv.executeSql(sinkDDL).print();tableEnv.executeSql(transformSQL).print();}
7). 完整源码github地址:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/format/src/main/java/com/felixzh/learning/flink/format/debezium_json/PostgreSQLCDC2Print.java
8). IDE(idea)运行PostgreSQLCDC2Print结果
由于我本地postgresql版本太低,PostgreSQLCDC2Print运行有异常,不过功能应该是ok的。




