暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

FlinkCDC2.0利用FlinkSQL采集MySQL

大数据研习社 2022-07-18
686

长按二维码关注

大数据领域必关注的公众号


1.依赖管理
将如下依赖包放到FLINK_HOME/lib下。
flink-sql-connector-mysql-cdc-2.2.0.jar
flink-connector-jdbc_2.11-1.14.3.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

2.Flink全局配置
修改flink-conf.yaml文件:
    execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
    execution.checkpointing.max-concurrent-checkpoints: 1
    execution.checkpointing.mode: EXACTLY_ONCE
    execution.checkpointing.timeout: 10min
    state.backend: filesystem
    state.checkpoints.dir: hdfs://mycluster/flinkcdc-checkpoints

    3.sql-client提交作业模式
    1.Standalone模式
    启动sql-client:bin/sql-client.sh embedded
    注意,如果使用standalone模式运行,需要先启动一个Flink standalone集群,方法如下:
    bin/start-cluster.sh

    2.yarn-session模式(本案例使用方式)
    先启动Flink yarn-session集群:bin/yarn-session.sh -s 1 -jm 1024 -tm 1024
    然后再启动sql-client:bin/sql-client.sh embedded -s yarn-session

    4.checkpoint配置
    官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#checkpointing
    #sql-client设置checkpoint参数
      SET 'execution.checkpointing.interval' = '10s';
      SET 'parallelism.default' = '3';

      5.创建source table
        CREATE TABLE `cars`(
        `id` BIGINT,
        `owerId` BIGINT,
        `carCode` STRING,
        `carColor` STRING,
        `type` BIGINT,
        `remark` STRING,
        PRIMARY KEY(id) NOT ENFORCED
        ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'hadoop1',
        'port' = '3306',
        'username' = 'hive',
        'password' = 'hive',
        'database-name' = 'sca',
        'table-name' = 'cars',
        'connect.timeout' = '60s'
        );

        6.创建sink table
          CREATE TABLE `cars_copy`(
          `id` BIGINT,
          `owerId` BIGINT,
          `carCode` STRING,
          `carColor` STRING,
          `type` BIGINT,
          `remark` STRING,
          PRIMARY KEY(id) NOT ENFORCED
          ) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8',
          'username' = 'hive',
          'password' = 'hive',
          'table-name' = 'cars_copy',
          'sink.parallelism' = '2'
          );

          7.source to sink table
          将采集过来的数据写入MySQL
            insert into cars_copy SELECT * FROM cars;

            查询结果表数据记录数
              select count(*) from cars_copy

              新增测试数据集(再次查看结果表)
                insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096006','10244815','港T·7RONE','红色','1',NULL);
                insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096007','10244816','港T·7RONE','黄色','1',NULL);
                备注:如果通过手动cacel job,下次重新启动job仍然会重头采集表中的数据。

                8.cacel job时保存Save point
                  bin/flink stop --savepointPath hdfs://mycluster/flinkcdc-savepoints -Dyarn.application.id=application_1658045078748_0001 79ce915e39fc1d18a194b6a464d7c3fd
                  备注:结尾一个参数为yarn中的job id,第二个参数为flink的job id。

                  9.cacel job之后重新恢复job
                  #设置job从上一次savepoint位置开始处理
                    SET 'execution.checkpointing.interval' = '10s';
                    SET 'parallelism.default' = '3';
                    SET 'execution.savepoint.path' = 'hdfs://mycluster/flinkcdc-savepoints/savepoint-79ce91-92206bcaaad2';
                    备注:该参数的值为savepoint路径。

                    #执行flink sql job
                      insert into cars_copy SELECT * FROM cars;

                      #新增测试数据集
                        insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096008','10244815','港T·7RONE','红色','1',NULL);
                        insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096009','10244816','港T·7RONE','黄色','1',NULL);


                        #再次查询结果表数据记录数
                          select count(*) from cars_copy

                          正常情况,这个时候采集的就是新增数据,历史数据不会再采集。
                          欢迎点赞 + 收藏 + 在看  素质三连 


                          往期精彩回顾
                          程序员,如何避免内卷
                          Apache 架构师总结的 30 条架构原则
                          【全网首发】Hadoop 3.0分布式集群安装
                          大数据运维工程师经典面试题汇总(附带答案)
                          大数据面试130题
                          某集团大数据平台整体架构及实施方案完整目录
                          大数据凉凉了?Apache将一众大数据开源项目束之高阁!
                          实战企业数据湖,抢先数仓新玩法
                          Superset制作智慧数据大屏,看它就够了
                          Apache Flink 在快手的过去、现在和未来
                          华为云-基于Ambari构建大数据平台(上)
                          华为云-基于Ambari构建大数据平台(下)
                          【HBase调优】Hbase万亿级存储性能优化总结
                          【Python精华】100个Python练手小程序
                          【HBase企业应用开发】工作中自己总结的Hbase笔记,非常全面!
                          【剑指Offer】近50个常见算法面试题的Java实现代码

                          长按识别左侧二维码

                               关注领福利    

                            领10本经典大数据书

                          文章转载自大数据研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论