Checkpoint
Flink会在输入的数据集上间隔性地生成checkpoint barrier,通过栅栏(barrier)将间隔时间段内的数
据划分到相应的checkpoint中。当程序出现异常时,Operator就能够从上一次快照中恢复所有算子之
前的状态,从而保证数据的一致性。例如在KafkaConsumer算子中维护offset状态,当系统出现问题无法从Kafka中消费数据时,可以将offset记录在状态中,当任务重新恢复时就能够从指定的偏移量开始消费数据。
默认情况Flink不开启检查点,用户需要在程序中通过调用方法配置和开启检查点,另外还可以调整其他相关参数
checkpoint测试:
1. 提交job
2. 取消job
3. 基于checkpoint数据 重启job
flink run -c com.state.WordCountCheckpoint -shdfs://node01:9000/flink/sasa/savepoin/Study.jar
如果任务的取消是在第一次checkpoint与第二次checkpoint之间,那么会存在数据的丢失,因为socket是不支持数据回放,如果读取的是kafka 默认支持数据回放
SavePoint原理
Savepoints 是检查点的一种特殊实现,底层实现其实也是使用Checkpoints的机制。Savepoints是用户以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而
导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证
SavePoint的路径需要在flink-conf.yaml中配置
state.savepoints.dir: hdfs://node01:9000/flink/state/savepoint
系统的升级顺序
1. 先savepoint
2. cancel job
flink savepoint 91708180bc440568f47ab0ec88087b43hdfs://node01:9000/flink/sasa如果在flink-conf.yaml中没有设置SavePoint的路径,可以在进行SavePoint的时候指定路径
3. 重启job
flink run -c com.state.WordCountCheckpoint -shdfs://node01:9000/flink/sasa/savepoint-917081-0a251a5323b7/Study.jar
最佳实战:
为了能够在作业的不同版本之间以及Flink的不同版本之间顺利升级,强烈推荐程序员通过手动给算子赋予ID,这些ID将用于确定每一个算子的状态范围。如果不手动给各算子指定ID,则会由Flink自动给每个算子生成一个ID。而这些自动生成的ID依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动设置ID




