暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dinky从checkpoint与savepoint自动恢复整库同步作业

Dinky开源 2022-08-29
1995
摘要:本文由韩公子老师带了 Dinky 实时计算平台从 checkpoint 与 savepoint 自动恢复整库同步作业的实操过程分享。内容包括:
  1. 场景
  2. Dinky 提交作业
  3. 自动 savepoint 恢复
  4. 自动 checkpoint 恢复
  5. 手动指定 checkpoint 恢复
  6. 总结


Tips:历史传送门
Dinky on k8s 整库同步实践
Dinky 实践系列之 Flink Catalog 元数据管理
Dinky实践系列之FlinkCDC整库实时入仓入湖
Dinky FlinkCDC 整库入仓 StarRocks
 

 GitHub 地址 
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~




一、场景

    使用 Dinky 自动 savepoint、checkpoint 恢复整库同步作业。
组件
版本
Flink
1.14.4
Flink-mysql-cdc
2.2.1
Mysql
5.7+
Dinky0.6.6
    温馨提示:  由于 Fink 自身 bug,Dinky 暂时不支持 Flink1.15.x 版本做 savepoint 处理, 请等待后续更新支持,或者使用小于 Flink1.15 的版本。



二、Dinky 提交作业

依赖准备

将 flink-sql-connector-mysql-cdc-2.2.1.jar 添加到 dinky 根目录 plugins 和 hdfs 集群配置路径上。

依赖图:

Mysql 数据源准备

    create database emp_1;


    use emp_1;


    CREATE TABLE IF NOT EXISTS `employees_1` (
    `emp_no` int(11) NOT NULL,
    `birth_date` date NOT NULL,
    `first_name` varchar(50) NOT NULL,
    `last_name` varchar(50) NOT NULL,
    `gender` enum('M','F') NOT NULL,
    `hire_date` date NOT NULL,
    PRIMARY KEY (`emp_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;


    insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
    insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");

    FlinkSQL 准备

      -- 测试参数, 生成环境不需设置
      SET pipeline.operator-chaining = false;


      SET table.local-time-zone = Asia/Shanghai;
      SET execution.runtime-mode = streaming;
      SET execution.checkpointing.interval = 6000;
      SET execution.checkpointing.tolerable-failed-checkpoints = 10;
      SET execution.checkpointing.timeout =600000;
      SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
      SET execution.checkpointing.mode = EXACTLY_ONCE;
      SET execution.checkpointing.unaligned = true;
      SET execution.checkpointing.max-concurrent-checkpoints = 1;
      SET state.checkpoints.num-retained = 3;
      SET restart-strategy = fixed-delay;
      SET restart-strategy.fixed-delay.attempts = 10 ;
      SET restart-strategy.fixed-delay.delay = 20s;
      SET table.exec.source.cdc-events-duplicate = true;
      SET table.sql-dialect = default;
      --SET pipeline.name = hive_catalog_cdc_orders;
      SET jobmanager.memory.process.size = 1600m;
      SET taskmanager.memory.process.size = 1780m;
      SET taskmanager.memory.managed.size = 200m;
      SET taskmanager.numberOfTaskSlots=1;
      SET yarn.application.queue= default;


      EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'hadoop102',
      'port' = '3306',
      'username' = 'root',
      'password' = '000000',
      'checkpoint' = '3000',
      'scan.startup.mode' = 'initial',
      'parallelism' = '1',
      'table-name' = 'emp_1\.employees_[0-9]+',
      'sink.connector' = 'print',
      )

      补充说明:

      Flink 需要开启 checkpoint,并配置好状态后端参数。

      配置 SavePoint 策略

          SavePoint 策略选择最近一次。

      任务提交

          因为作业是第一次运行,之前没有做过savepoint,所以作业是一个新的程序,消费两条数据。

      Flink WebUI

      TaskManager 输出





      三、自动 savepoint 恢复

          查看作业详情栏, 如下图右上角所示, 他们的含义分别为:
      名称
      含义
      智能停止触发一次 SavePoint,并停止作业
      SavePoint 触发
      只触发一次 SavePoint
      SavePoint 暂停触发一次 SavePoint,并暂停作业
      SavePoint 停止触发一次 SavePoint,并停止作业

      SavePoint 停止作业


          点击 '智能停止' 或者 'Savepoint停止',触发一次Savepoint,并停止作业。

      运维中心查看作业 SavePoint 记录

          等作业停止后,在作业快照 Savepoint 栏中,查看到刚刚成功保存的Savepoint 记录。

      数据库中查看 SavePoint 信息

          在dlink数据库中,也可以查看到保存的Savepoint元数据。

      数据开发查看作业 SavePoint 信息

          同时,在'数据开发' 面板对应的作业中,右边栏也可以查看到savepoint记录。

      插入一条数据

          接下来,往表中插入一条新的数据。

        insert into employees_1 VALUES ("55", "2020-09-15", "huang", "meiji", "F", "2022-04-12");

        重启作业

            作业会自动从之前保存的savepoint处启动。

        断点续传

            观察到作业,成功做到断点续传,只消费到一条记录。

        FlinkWeb UI

        TaskManager Stdout

            Taskmanager 成功输出一条记录。



        四、自动 CheckPoint 恢复

            Dinky 的 checkpoint 恢复功能使用非常方便,只需要点击一个按钮即可恢复,整体过程如下所示:

        准备数据源

          create database emp_2;


          use emp_2;


          CREATE TABLE IF NOT EXISTS `employees_2` (
          `emp_no` int(11) NOT NULL,
          `birth_date` date NOT NULL,
          `first_name` varchar(50) NOT NULL,
          `last_name` varchar(50) NOT NULL,
          `gender` enum('M','F') NOT NULL,
          `hire_date` date NOT NULL,
          PRIMARY KEY (`emp_no`)
          ) ENGINE=InnoDB DEFAULT CHARSET=utf8;


          -- flink sql
          -- 测试参数, 生成环境不需设置
          SET pipeline.operator-chaining = false;


          SET table.local-time-zone = Asia/Shanghai;
          SET execution.runtime-mode = streaming;
          SET execution.checkpointing.interval = 6000;
          SET execution.checkpointing.tolerable-failed-checkpoints = 10;
          SET execution.checkpointing.timeout =600000;
          SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
          SET execution.checkpointing.mode = EXACTLY_ONCE;
          SET execution.checkpointing.unaligned = true;
          SET execution.checkpointing.max-concurrent-checkpoints = 1;
          SET state.checkpoints.num-retained = 3;
          SET restart-strategy = fixed-delay;
          SET restart-strategy.fixed-delay.attempts = 10 ;
          SET restart-strategy.fixed-delay.delay = 20s;
          SET table.exec.source.cdc-events-duplicate = true;
          SET table.sql-dialect = default;
          --SET pipeline.name = hive_catalog_cdc_orders;
          SET jobmanager.memory.process.size = 1600m;
          SET taskmanager.memory.process.size = 1780m;
          SET taskmanager.memory.managed.size = 200m;
          SET taskmanager.numberOfTaskSlots=1;
          SET yarn.application.queue= default;


          EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH (
          'connector' = 'mysql-cdc',
          'hostname' = 'hadoop102',
          'port' = '3306',
          'username' = 'root',
          'password' = '000000',
          'checkpoint' = '3000',
          'scan.startup.mode' = 'initial',
          'parallelism' = '1',
          'table-name' = 'emp_2\.employees_[0-9]+',
          'sink.connector' = 'print',
          )

          提交作业

          插入数据

            insert into employees_2 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
            insert into employees_2 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");

            消费两条数据

            普通停止

                点击 '普通停止',不做savepoint,从checkpoint 处启动。

            运维中心查看 checkpoint 信息

                停止之后,我们可以从 '作业快照'中,查看到作业保存的checkpoint记录。

                这跟hdfs 上保存的checkpoint记录 是一致的。

            hdfs 的 checkpoint

            恢复最新的 checkpoint

            重启后插入一条数据

              insert into employees_2 VALUES ("13", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");

              断点续传

              温馨提示

                  运行 perjob、 app 模式的作业,如果作业被强行kill掉、内部错误等原因导致集群实例销毁, 会导致 Dinky 无法访问 JobManager 来获取 checkpoint 信息,可能存在 dinky 保存的 checkpoint 记录,跟 hdfs 上保存的记录不一致,有可能缺失最新的 checkpoint,所以线上作业恢复 checkpoint 时,需要查看 hdfs 上保存的最新 checkpoint 记录与 dinky 作比较。



              五、手动指定 checkpoint 恢复


              作业中指定 checkpoint

                  在上一个步骤中,点击 '此处恢复' 之后,作业能 '断点续传',实际原理是dinky 将 checkpoint 的记录填充到了作业的右边栏,选项为 '指定一次' 然后运行的

              从指定 checkpoint 中恢复

              所以,dinky也是支持手动指定某处checkpoint 恢复,只需 'SavePoin策略' 选择 '指定一次',将ck路径粘贴到 'SavePointPath',运行即可恢复checkpoint。

              温馨提示

                  运行完毕,如查看到成功恢复ck之后,还请将 'SavePoin策略' 还原回 '最近一次',避免后续从这个检查点再次恢复。



              六、总结

                  优点: 使用dinky,简化了线上作业的部署、运维、作业恢复等操作,增强了flink作业的健壮性。

                  不足: 如果线上作业过多,'运维中心' 找到指定的作业会比较费力,所以期待 '运维中心',增加能按照 '数据开发' 面板的分目录、分层级查看作业的功能,这样就能快速找到对应的作业。





              交流

              欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。

              QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。

              微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。

              钉钉社区群(推荐):

                     公众号:Dinky开源



              扫描二维码获取

              更多精彩

              Dinky开源




              文章转载自Dinky开源,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论