暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink CDC Connectors实战

大数据从业者 2020-12-09
1679

概述

Flink CDC Connectors是一个用于从不同数据库用CDC整合changes信息的source connectors集合,实现基于Debezium引擎,能够完全利用Debezium的能力。Debezium详见官网:https://debezium.io/

支持的(已测试的)数据库信息

功能特性

1. 支持以exactly-once语义读取数据库snapshot和binlogs

2. 对于DataStream API,用户可以在一个job中消费多库多表changes信息,无需额外部署Debezium和Kafka

3. 对于Table/SQL API,用户可以使用SQL DDL创建表消费单表changes信息

项目地址

github: https://github.com/ververica/flink-cdc-connectors

案例说明

1). 无论是使用DataStream API、Table/SQL API、SQLClient,均需要添加如下依赖

            <dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.1.0</version>
            </dependency>
    <dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId> flink-connector-postgres-cdc</artifactId>
    <version>1.1.0</version>
    </dependency>

    2). 前置条件:需要提前安装mysql和postgresql,注意使用上面提到的版本

    案例实战

    1). mysql创建库表、插入数据

      mysql> CREATE DATABASE db_felixzh_cdc;
      Query OK, 1 row affected (0.00 sec)


      mysql> CREATE TABLE tb_products_cdc( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(64), description VARCHAR(128) );
      Query OK, 0 rows affected (0.01 sec)


      mysql> INSERT INTO tb_products_cdc VALUES (DEFAULT, 'zhangsan', 'aaa'), (DEFAULT, 'lisi', 'bbb'), (DEFAULT, 'wangwu', 'ccc');
      Query OK, 3 rows affected (0.01 sec)
      Records: 3 Duplicates: 0 Warnings: 0

      2). mysql cdc代码main方法:

        public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.noRestart());
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        String sourceDDL =
        "CREATE TABLE mysql_binlog (\n" +
        " id INT NOT NULL,\n" +
        " name STRING,\n" +
        " description STRING\n" +
        ") WITH (\n" +
        " 'connector' = 'mysql-cdc',\n" +
        " 'hostname' = 'felixzh',\n" +
        " 'port' = '3306',\n" +
        " 'username' = 'root',\n" +
        " 'password' = 'passwd',\n" +
        " 'database-name' = 'db_felixzh_cdc',\n" +
        " 'table-name' = 'tb_products_cdc'\n" +
        ")";


        String sinkDDL =
        "CREATE TABLE tb_sink (\n" +
        " id INT,\n" +
        " name STRING,\n" +
        " description STRING\n" +
        ") WITH (\n" +
        " 'connector' = 'print'\n" +
        ")";


        String transformSQL =
        "INSERT INTO tb_sink " +
        "SELECT * " +
        "FROM mysql_binlog ";


        tableEnv.executeSql(sourceDDL).print();
        tableEnv.executeSql(sinkDDL).print();
                tableEnv.executeSql(transformSQL).print();
        }

        3). 完整源码github地址:

        https://github.com/felixzh2020/felixzh-learning-flink/blob/master/format/src/main/java/com/felixzh/learning/flink/format/debezium_json/MySqlCDC2Print.java

        4). IDE(idea)运行MySqlCDC2Print结果

          +--------+
          | result |
          +--------+
          | OK |
          +--------+
          1 row in set
          +--------+
          | result |
          +--------+
          0 row in set
          14:50:02,488 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set.
          14:50:02,495 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.
          +------------------------------------------+
          | default_catalog.default_database.tb_sink |
          +------------------------------------------+
          | -1 |
          +------------------------------------------+
          1 row in set
          14:50:04,432 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
          14:50:04,688 WARN org.apache.flink.metrics.MetricGroup - The operator name Sink: Sink(table=[default_catalog.default_database.tb_sink], fields=[id, name, description]) exceeded the 80 characters length limit and was truncated.
          14:50:04,711 WARN org.apache.flink.metrics.MetricGroup - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, mysql_binlog]], fields=[id, name, description]) exceeded the 80 characters length limit and was truncated.
          +I(1,zhangsan,aaa)
          +I(2,wangwu,bbb)
          +I(3,wangwu,ccc)
          +I(5,lisi,ddd)

          5). postgresql创建库表,插入数据

            postgres=# CREATE DATABASE db_felixzh_cdc;
            CREATE DATABASE
            postgres=# \c db_felixzh_cdc;
            SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off)
            You are now connected to database "db_felixzh_cdc" as user "postgres".
            db_felixzh_cdc=# CREATE TABLE tb_products_cdc (id INT NOT NULL PRIMARY KEY, name VARCHAR(255),description VARCHAR(512));
            CREATE TABLE


            db_felixzh_cdc=# INSERT INTO tb_products_cdc VALUES (1,'scooter','Small 2-wheel scooter');
            INSERT 0 1

            6). postgressql cdc代码main方法:

              public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
              env.setRestartStrategy(RestartStrategies.noRestart());
              StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


              String sourceDDL =
              "CREATE TABLE postgresql_log (\n" +
              " id INT NOT NULL,\n" +
              " name STRING,\n" +
              " description STRING\n" +
              ") WITH (\n" +
              " 'connector' = 'postgres-cdc',\n" +
              " 'hostname' = 'felixzh',\n" +
              " 'port' = '5432',\n" +
              " 'username' = 'postgres',\n" +
              " 'password' = 'passwd',\n" +
              " 'database-name' = 'db_felixzh_cdc',\n" +
              " 'schema-name' = 'db_felixzh_cdc',\n" +
              " 'table-name' = 'tb_products_cdc'\n" +
              ")";


              String sinkDDL =
              "CREATE TABLE tb_sink (\n" +
              " id INT,\n" +
              " name STRING,\n" +
              " description STRING\n" +
              ") WITH (\n" +
              " 'connector' = 'print'\n" +
              ")";


              String transformSQL =
              "INSERT INTO tb_sink " +
              "SELECT * " +
              "FROM postgresql_log ";


              tableEnv.executeSql(sourceDDL).print();
              tableEnv.executeSql(sinkDDL).print();
                      tableEnv.executeSql(transformSQL).print();
              }

              7). 完整源码github地址:

              https://github.com/felixzh2020/felixzh-learning-flink/blob/master/format/src/main/java/com/felixzh/learning/flink/format/debezium_json/PostgreSQLCDC2Print.java

              8). IDE(idea)运行PostgreSQLCDC2Print结果

              由于我本地postgresql版本太低,PostgreSQLCDC2Print运行有异常,不过功能应该是ok的。

              文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论