场景 Dinky 提交作业 自动 savepoint 恢复 自动 checkpoint 恢复 手动指定 checkpoint 恢复 总结
GitHub 地址 
一、场景
| 组件 | 版本 |
| Flink | 1.14.4 |
| Flink-mysql-cdc | 2.2.1 |
| Mysql | 5.7+ |
| Dinky | 0.6.6 |
二、Dinky 提交作业
依赖准备
将 flink-sql-connector-mysql-cdc-2.2.1.jar 添加到 dinky 根目录 plugins 和 hdfs 集群配置路径上。
依赖图:


Mysql 数据源准备
create database emp_1;use emp_1;CREATE TABLE IF NOT EXISTS `employees_1` (`emp_no` int(11) NOT NULL,`birth_date` date NOT NULL,`first_name` varchar(50) NOT NULL,`last_name` varchar(50) NOT NULL,`gender` enum('M','F') NOT NULL,`hire_date` date NOT NULL,PRIMARY KEY (`emp_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");
FlinkSQL 准备
-- 测试参数, 生成环境不需设置SET pipeline.operator-chaining = false;SET table.local-time-zone = Asia/Shanghai;SET execution.runtime-mode = streaming;SET execution.checkpointing.interval = 6000;SET execution.checkpointing.tolerable-failed-checkpoints = 10;SET execution.checkpointing.timeout =600000;SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;SET execution.checkpointing.mode = EXACTLY_ONCE;SET execution.checkpointing.unaligned = true;SET execution.checkpointing.max-concurrent-checkpoints = 1;SET state.checkpoints.num-retained = 3;SET restart-strategy = fixed-delay;SET restart-strategy.fixed-delay.attempts = 10 ;SET restart-strategy.fixed-delay.delay = 20s;SET table.exec.source.cdc-events-duplicate = true;SET table.sql-dialect = default;--SET pipeline.name = hive_catalog_cdc_orders;SET jobmanager.memory.process.size = 1600m;SET taskmanager.memory.process.size = 1780m;SET taskmanager.memory.managed.size = 200m;SET taskmanager.numberOfTaskSlots=1;SET yarn.application.queue= default;EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH ('connector' = 'mysql-cdc','hostname' = 'hadoop102','port' = '3306','username' = 'root','password' = '000000','checkpoint' = '3000','scan.startup.mode' = 'initial','parallelism' = '1','table-name' = 'emp_1\.employees_[0-9]+','sink.connector' = 'print',)
补充说明:
Flink 需要开启 checkpoint,并配置好状态后端参数。
配置 SavePoint 策略
SavePoint 策略选择最近一次。

任务提交
因为作业是第一次运行,之前没有做过savepoint,所以作业是一个新的程序,消费两条数据。

Flink WebUI

TaskManager 输出

三、自动 savepoint 恢复
| 名称 | 含义 |
| 智能停止 | 触发一次 SavePoint,并停止作业 |
| SavePoint 触发 | 只触发一次 SavePoint |
| SavePoint 暂停 | 触发一次 SavePoint,并暂停作业 |
| SavePoint 停止 | 触发一次 SavePoint,并停止作业 |
SavePoint 停止作业
点击 '智能停止' 或者 'Savepoint停止',触发一次Savepoint,并停止作业。

运维中心查看作业 SavePoint 记录
等作业停止后,在作业快照 Savepoint 栏中,查看到刚刚成功保存的Savepoint 记录。

数据库中查看 SavePoint 信息
在dlink数据库中,也可以查看到保存的Savepoint元数据。

数据开发查看作业 SavePoint 信息
同时,在'数据开发' 面板对应的作业中,右边栏也可以查看到savepoint记录。

插入一条数据
接下来,往表中插入一条新的数据。
insert into employees_1 VALUES ("55", "2020-09-15", "huang", "meiji", "F", "2022-04-12");
重启作业
作业会自动从之前保存的savepoint处启动。

断点续传
观察到作业,成功做到断点续传,只消费到一条记录。

FlinkWeb UI

TaskManager Stdout
Taskmanager 成功输出一条记录。

四、自动 CheckPoint 恢复
Dinky 的 checkpoint 恢复功能使用非常方便,只需要点击一个按钮即可恢复,整体过程如下所示:
准备数据源
create database emp_2;use emp_2;CREATE TABLE IF NOT EXISTS `employees_2` (`emp_no` int(11) NOT NULL,`birth_date` date NOT NULL,`first_name` varchar(50) NOT NULL,`last_name` varchar(50) NOT NULL,`gender` enum('M','F') NOT NULL,`hire_date` date NOT NULL,PRIMARY KEY (`emp_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- flink sql-- 测试参数, 生成环境不需设置SET pipeline.operator-chaining = false;SET table.local-time-zone = Asia/Shanghai;SET execution.runtime-mode = streaming;SET execution.checkpointing.interval = 6000;SET execution.checkpointing.tolerable-failed-checkpoints = 10;SET execution.checkpointing.timeout =600000;SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;SET execution.checkpointing.mode = EXACTLY_ONCE;SET execution.checkpointing.unaligned = true;SET execution.checkpointing.max-concurrent-checkpoints = 1;SET state.checkpoints.num-retained = 3;SET restart-strategy = fixed-delay;SET restart-strategy.fixed-delay.attempts = 10 ;SET restart-strategy.fixed-delay.delay = 20s;SET table.exec.source.cdc-events-duplicate = true;SET table.sql-dialect = default;--SET pipeline.name = hive_catalog_cdc_orders;SET jobmanager.memory.process.size = 1600m;SET taskmanager.memory.process.size = 1780m;SET taskmanager.memory.managed.size = 200m;SET taskmanager.numberOfTaskSlots=1;SET yarn.application.queue= default;EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH ('connector' = 'mysql-cdc','hostname' = 'hadoop102','port' = '3306','username' = 'root','password' = '000000','checkpoint' = '3000','scan.startup.mode' = 'initial','parallelism' = '1','table-name' = 'emp_2\.employees_[0-9]+','sink.connector' = 'print',)
提交作业

插入数据
insert into employees_2 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");insert into employees_2 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");
消费两条数据

普通停止
点击 '普通停止',不做savepoint,从checkpoint 处启动。

运维中心查看 checkpoint 信息
停止之后,我们可以从 '作业快照'中,查看到作业保存的checkpoint记录。

这跟hdfs 上保存的checkpoint记录 是一致的。
hdfs 的 checkpoint

恢复最新的 checkpoint

重启后插入一条数据
insert into employees_2 VALUES ("13", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
断点续传

温馨提示
运行 perjob、 app 模式的作业,如果作业被强行kill掉、内部错误等原因导致集群实例销毁, 会导致 Dinky 无法访问 JobManager 来获取 checkpoint 信息,可能存在 dinky 保存的 checkpoint 记录,跟 hdfs 上保存的记录不一致,有可能缺失最新的 checkpoint,所以线上作业恢复 checkpoint 时,需要查看 hdfs 上保存的最新 checkpoint 记录与 dinky 作比较。
五、手动指定 checkpoint 恢复
作业中指定 checkpoint
在上一个步骤中,点击 '此处恢复' 之后,作业能 '断点续传',实际原理是dinky 将 checkpoint 的记录填充到了作业的右边栏,选项为 '指定一次' 然后运行的

从指定 checkpoint 中恢复
所以,dinky也是支持手动指定某处checkpoint 恢复,只需 'SavePoin策略' 选择 '指定一次',将ck路径粘贴到 'SavePointPath',运行即可恢复checkpoint。

温馨提示
运行完毕,如查看到成功恢复ck之后,还请将 'SavePoin策略' 还原回 '最近一次',避免后续从这个检查点再次恢复。
六、总结
优点: 使用dinky,简化了线上作业的部署、运维、作业恢复等操作,增强了flink作业的健壮性。
不足: 如果线上作业过多,'运维中心' 找到指定的作业会比较费力,所以期待 '运维中心',增加能按照 '数据开发' 面板的分目录、分层级查看作业的功能,这样就能快速找到对应的作业。
交流
欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。
QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。
微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。
钉钉社区群(推荐):



扫描二维码获取
更多精彩
Dinky开源





