暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Checkpoint的核心流程(37)

beenrun 2022-10-24
611

本文主要内容

  • Checkpoint的核心流程

  • 对齐Checkpoint

  • 非对齐Checkpoint

  • Checkpoint相关API


Checkpoint核心流程


Job Manager是协调者,operator task是执行者

(1)JobManager的Checkpoint Coordinator会定期向每个source subtask发送命令,start checkkpoint,触发checkpoint(trigger checkpoint)

(2)Source端收到(trigger checkpoint)命令后,产生barrier(类似watermark),并通过广播的方式传递给下游,这个时候,Source subtask会同时执行本地的checkpoint-n,当所有的都执行完成后,会告诉Job Manager,我完成了,给一个应答ack。

(3)当流图的所有节点都完成了checkpoint-n,JobManager会收到所有节点的ack,那么就表示完成了checkpoint-n,这个时候就会向所有task广播checkpoint-n已经完成的通知消息。

这里也就是两阶段提交,

对齐的checkpoint

这里假设有2条数据流,一条是字母流,一条是数字流

(1)算子收到数字流的Barrier,字母流对应Barrier还没有到达

(2)算子收到数字Barrier,会继续从继续从数字流中接收数据,这些数据没有处理,会放入缓存中,等待字母流到达,在字母流到达前的数字会缓存
(3)字母流的Barrier到达,算子开始对齐State,进行异步快照,并将Barrier向下游广播,不会等待快照完毕
(4)算子做异步快照,先处理缓存中的数据,然后再从输入通道中获取数据

非对齐的checkpoint

从Flink1.11开始引入,不能实现精确一致
(1)barrier非对齐:就是当还有其它流的barrier没有到达时,为了不影响性能,也不管他,直接接着处理barrier之后的数据,等到所有的barrier都到达后,就会进行checkpoint
(2)存在问题:当程序恢复,如果是非对齐,那么che-1000快照之前,已经处理了chk-1000对应的offset之后的数据,当程序从chk-1000恢复任务的时候,对应的chk-1000的offset之后的数据会再次处理一次,所以会出现重复消费。这里要注意逻辑处理。
(3)如果您的 Checkpointing 由于背压导致周期非常的长,您应该使用非对齐 Checkpoint。
(4)非对齐checkpoint会增加状态存储IO因此当状态存储IO是整个checkpoint过程真正的瓶颈时,不应当使用非对齐checkpoint
(5)Savepoint 也不能与非对齐 Checkpoint 同时发生,因此它们将会花费稍长的时间。
(6)Flink 当前并不支持并发的非对齐 Checkpoint。
(7)Savepoint 也不能与非对齐 Checkpoint 同时发生,因此它们将会花费稍长的时间。

checkpoint相关参数和API

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)


    //checkpoint执行时间间隔,就是job Manager每隔多长时间时间下发一个开始执行checkpoint任务
    env.enableCheckpointing(3000)
    env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
    //保存checkpoint的路径
    env.getCheckpointConfig.setCheckpointStorage(new Path("hdfs://data01:8020/flink-job-checkpoint/"))
    //checkpoint最大失败次数
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(10)
    //checkpoint算法模式,是否需要对齐
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)


    //job取消是否保留checkpoint数据
    env.getCheckpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS)
    //设置checkpoint对齐的超时时间
    env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMillis(2000))


    //两次checkpoint最小时间间隔
    env.getCheckpointConfig.setCheckpointInterval(2000)
    //最大并行checkpoint数量,限制一个都没有完成checkpoint,最多有3个在“飞行中”
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)


    本文主要内容

    • Checkpoint的核心流程

    • 对齐Checkpoint

    • 非对齐Checkpoint

    • Checkpoint相关API


    奇迹的出现往往就在再坚持一下的时候!

    感谢阅读。期待点赞、分享、关注。

    文章转载自beenrun,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论