暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

flink checkpoint 原理与实践

大数据启示录 2022-02-15
957
接下来我给大家分享一下四大基石的最后一个知识点:checkpoint,全文总共分为3大部分,10个小点:


flink checkpoint知识点


1、什么是Checkpoint检查点?
2、什么是Savepoint保存点?
3、什么是CheckpointCoordinator检查点协议器?
4、Checkpoint中保存的是什么信息?
5、检查点恢复机制?
6、保存点恢复机制?
7、checkpoint 语义
8、Checkpoint如何实现轻量级异步分布式快照?
9、什么是barrier对齐?
10、什么是barrier不对齐?
11、checkpoint 执行全过程
12、checkpoint 常见失败原因和注意事项

什么是Checkpoint检查点?


Checkpoint被叫做检查点,是Flink实现容错机制最核心的功能,是Flink可靠性的基石,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

Flink Checkpoint 是 Flink 自身的系统行为,用户无法对其进行交互,用户可以在程序启动之前,设置好实时任务 Checkpoint 相关的参数,当任务启动之后,剩下的就全交给 Flink 自行管理。

Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法

注意:区分State和Checkpoint

1.State:

一般指一个具体的Task/Operator的状态(operator的状态表示一些算子在运行的过程中会产生的一些中间结果)

State数据默认保存在Java的堆内存中/TaskManage节点的内存中

State可以被记录,在失败的情况下数据还可以恢复。

2.Checkpoint:

       表示了一个FlinkJob在一个特定时刻的一份全局状态快照,即包含了所有Task/Operator的状态

       可以理解为Checkpoint是把State数据定时持久化存储了

比如KafkaConsumer算子中维护的Offset状态,当任务重新恢复的时候可以从Checkpoint中获取。

什么是SavePoint


保存点在 Flink 中叫作 Savepoint. 是基于Flink 检查点机制的应用完整快照备份机制. 用来保存状态 可以在另一个集群或者另一个时间点.从保存的状态中将作业恢复回来。适用 于应用升级、集群迁移、 Flink 集群版本更新、A/B测试以及假定场景、暂停和重启、归档等场景。保存点可以视为一个(算子 ID -> State) 的Map,对于每一个有状态的算子,Key是算子ID,Value是算子State。

Savepoint 需要由程序员手动执行,而Checkpoint 是由程序自动执行。

什么是检查点协调器


点协调器


Flink中检查点协调器叫作 CheckpointCoordinator,负责协调 Flink 算子的 State 的分布式快照。当触发快照的时候,CheckpointCoordinator向 Source 算子中注入Barrier消息 ,然后等待所有的Task通知检查点确认完成,同时持有所有 Task 在确认完成消息中上报的State句柄。

Checkpoint中保存的是什么信息




检查点里面到底保存着什么信息呢?我们以flink消费kafka数据wordcount为例:


1、我们从Kafka读取到一条条的日志,从日志中解析出app_id,然后将统计的结果放到内存中一个Map集合,app_id做为key,对应的pv做为value,每次只需要将相应app_id 的pv值+1后put到Map中即可;
2、kafka topic:test;
3、flink运算流程如下:

kafka topic有且只有一个分区

假设kafka的topic-test只有一个分区,flink的Source task记录了当前消费到kafka test topic的所有partition的offset

例:(0,1000)表示0号partition目前消费到offset为1000的数据

Flink的pv task记录了当前计算的各app的pv值,为了方便讲解,我这里有两个app:app1、app2

例:(app1,50000)(app2,10000)
表示app1当前pv值为50000
表示app2当前pv值为10000
每来一条数据,只需要确定相应app_id,将相应的value值+1后put到map中即可;


该案例中,CheckPoint保存的其实就是第n次CheckPoint消费的offset信息和各app的pv值信息,记录一下发生CheckPoint当前的状态信息,并将该状态信息保存到相应的状态后端。图下代码:(注:状态后端是保存状态的地方,决定状态如何保存,如何保障状态高可用,我们只需要知道,我们能从状态后端拿到offset信息和pv信息即可。状态后端必须是高可用的,否则我们的状态后端经常出现故障,会导致无法通过checkpoint来恢复我们的应用程序)。
chk-100
offset:(01000
pv:(app1,50000)(app2,10000
该状态信息表示第100CheckPoint的时候, partition 0 offset消费到了1000,pv统计结果为(app1,50000)(app2,10000)

总结:

  1. 当前检查点开始时数据源(例如Kafka)中消息的offset。

  2. 记录了所有有状态的operator当前的状态信息(例如sum中的数值)。

当作业失败后,检查点如何恢复作业?



Flink提供了 应用自动恢复机制  手动作业恢复机制

应用自动恢复机制:

Flink设置有作业失败重启策略,包含三种:
1、定期恢复策略:fixed-delay
固定延迟重启策略会尝试一个给定的次数来重启Job,如果超过最大的重启次数,Job最终将失败,在连续两次重启尝试之间,重启策略会等待一个固定时间,默认Integer.MAX_VALUE次
2、失败比率策略:failure-rate
失败率重启策略在job失败后重启,但是超过失败率后,Job会最终被认定失败,在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

3、直接失败策略:None   失败不重启

手动作业恢复机制

因为Flink检查点目录分别对应的是JobId,每通过flink run 方式/页面提交方式恢复都会重新生成 jobId,Flink 提供了在启动之时通过设置 -s .参数指定检查点目录的功能,让新的 jobld 读取该检查点元文件信息和状态信息,从而达到指定时间节点启动作业的目的。
启动方式如下:
/bin/flink -s /flink/checkpoints/03112312a12398740a87393/chk-50/_metadata -p @Parallelisim -c @ Mainclass @jar

当作业失败后,从保存点如何恢复作业?



从保存点恢复作业并不简单,尤其是在作业变更(如修改逻辑、修复 bug) 的情况下, 需要考虑如下几点:
(1)算子的顺序改变 
如果对应的 UID 没变,则可以恢复,如果对应的 UID 变了恢复失败。
(2)作业中添加了新的算子 
如果是无状态算子,没有影响,可以正常恢复,如果是有状态的算子,跟无状态的算子 一样处理。
(3)从作业中删除了一个有状态的算子
默认需要恢复保存点中所记录的所有算子的状态,如果删除了一个有状态的算子,从保存点回复的时候被删除的OperatorID找不到,所以会报错 可以通过在命令中添加 
-- allowNonReStoredSlale (short: -n )跳过无法恢复的算子 。
(4)添加和删除无状态的算子
如果手动设置了 UID 则可以恢复,保存点中不记录无状态的算子 如果是自动分配的 UID ,那么有状态算子的可能会变( Flink 一个单调递增的计数器生成 UID,DAG 改版,计数器极有可能会变) 很有可能恢复失败。

flink checkpoint语义


Flink Checkpoint 支持两种语义:Exactly_OnceAt_least_Once,默认的 Checkpoint 语义是 Exactly_Once。具体语义含义如下:

Exactly_Once 含义是:保证每条数据对于 Flink 任务的状态结果只影响一次。打个比方,比如 WordCount 程序,目前实时统计的 "hello" 这个单词数为 5,同时这个结果在这次 Checkpoint 成功后,保存在了 HDFS。在下次 Checkpoint 之前, 又来 2 个 "hello" 单词,突然程序遇到外部异常自动容错恢复,会从最近的 Checkpoint 点开始恢复,那么会从单词数为 5 的这个状态点开始恢复,Kafka 消费的数据点位也是状态为 5 这个点位开始计算,所以即使程序遇到外部异常自动恢复时,也不会影响到 Flink 状态的结果计算。

At_Least_Once 含义是:每条数据对于 Flink 任务的状态计算至少影响一次。比如在 WordCount 程序中,你统计到的某个单词的单词数可能会比真实的单词数要大,因为同一条消息,当 Flink 任务容错恢复后,可能将其计算多次。

Flink 中 Exactly_Once 和 At_Least_Once 具体是针对 Flink 任务状态而言的,并不是 Flink 程序对消息记录只处理一次。举个例子,当前 Flink 任务正在做 Checkpoint,该次 Checkpoint 还没有完成,这次 Checkpoint 时间段的数据其实已经进入 Flink 程序处理,只是程序状态没有最终存储到远程存储。当程序突然遇到异常,进行容错恢复时,那么就会从最新的 Checkpoint 进行状态恢复重启,上一次 Checkpoint 成功到这次 Checkpoint 失败的数据还会进入 Flink 系统重新处理,具体实例如下图:

上图中表示一个 WordCount 实时任务的 Checkpoint,在进行 chk-5 Checkpoint 时,突然遇到程序异常,那么实时任务会从 chk-4 进行恢复,那么之前 chk-5 处理的数据,Flink 系统会再次进行处理。不过这些数据的状态没有 Checkpoint 成功,所以 Flink 任务容错恢复再次运行时,对于状态的影响还是只有一次。

Exactly_Once 和 At_Least_Once 具体在底层实现大致相同,具体差异表现在 CheckpointBarrier 对齐方式的处理:

如果是 Exactly_Once 模式,某个算子的 Task 有多个输入通道时,当其中一个输入通道收到 CheckpointBarrier 时,Flink Task 会阻塞该通道,其不会处理该通道后续数据,但是会将这些数据缓存起来,一旦完成了所有输入通道的 CheckpointBarrier 对齐,才会继续对这些数据进行消费处理。

对于 At_least_Once,同样针对某个算子的 Task 有多个输入通道的情况下,当某个输入通道接收到 CheckpointBarrier 时,它不同于 Exactly Once,即使没有完成所有输入通道 CheckpointBarrier 对齐,At Least Once 也会继续处理后续接收到的数据。所以使用 At Least Once 不能保证数据对于状态计算只有一次的计算影响。

如何实现轻量级异步分布式快照?



Flink快照主要包括两部分数据一部分是数据流的数据,另一部分是operator的状态数据。对应的快照机制的实现有主要两个部分组成,一个是屏障(Barrier),一个是状态(State)。因为Flink这里处理的数据流,数据在多个operator的DAG拓扑中持续流动,要想实现某个时刻快照可以用于系统故障恢复,必须保证这个快照,完全能够确定某一个时刻状态,这个时刻之前的数据全部处理完,之后的数据一个都没有处理。这里就引入了屏障这个概念


要实现分布式快照,最关键的是能够将数据流切分。Flink 中使用 Barrier (屏障)来切分数据 流。Barrierr 会周期性地注入数据流中,作为数据流的一部分,从上游到下游被算子处理。Barrier 会严格保证顺序,不会超过其前边的数据。Barrier 将记录分割成记录集,两个 Barrier 之间的数据流中的数据隶属于同一个检查点。每一个 Barrier 都携带一个其所属快照的 ID 编号。Barrier 随着数据向下流动,不会打断数据流,因此非常轻量。 在一个数据流中,可能会存在多个隶属于不同快照的 Barrier ,并发异步地执行分布式快照,如下图所示:


Barrier 会在数据流源头被注人并行数据流中。Barrier n所在的位置就是恢复时数据重新处理的起始位置。 例如,在Kafka中,这个位置就是最后一个记录在分区内的偏移量 ( offset) ,作业恢复时,会根据这个位置从这个偏移量之后向 kafka 请求数据 这个偏移量就是State中保存的内容之一。

Barrier 接着向下游传递。当一个非数据源算子从所有的输入流中收到了快照 n 的Barrier时,该算子就会对自己的 State 保存快照,并向自己的下游 广播 发送快照 n 的 Barrier。一旦Sink 算子接收到 Barrier ,有两种情况:

(1)如果是引擎内严格一次处理保证,当 Sink 算子已经收到了所有上游的 Barrie  n 时, Sink 算子对自己的 State 进行快照,然后通知检查点协调器( CheckpointCoordinator) 。当所有 的算子都向检查点协调器汇报成功之后,检查点协调器向所有的算子确认本次快照完成。
(2)如果是端到端严格一次处理保证,当 Sink 算子已经收到了所有上游的 Barrie n 时, Sink 算子对自己的 State 进行快照,并预提交事务(两阶段提交的第一阶段),再通知检查点协调器( CheckpointCoordinator) ,检查点协调器向所有的算子确认本次快照完成,Sink 算子提交事务(两阶段提交的第二阶段),本次事务完成。


我们接着04的案例来具体说一下如何执行分布式快照:


对应到pv案例中就是,Source Task接收到JobManager的编号为chk-100(从最近一次恢复)的CheckPoint触发请求后,发现自己恰好接收到kafka offset(0,1000)处的数据,所以会往offset(0,1000)数据之后offset(0,1001)数据之前安插一个barrier,然后自己开始做快照,也就是将offset(0,1000)保存到状态后端chk-100中。然后barrier接着往下游发送,当统计pv的task接收到barrier后,也会暂停处理数据,将自己内存中保存的pv信息(app1,50000)(app2,10000)保存到状态后端chk-100中。OK,flink大概就是通过这个原理来保存快照的;
统计pv的task接收到barrier,就意味着barrier之前的数据都处理了,所以说,不会出现丢数据的情况。

什么是barrier对齐



                      


一旦Operator从输入流接收到CheckPoint barrier n,它就不能处理来自该流的任何数据记录,直到它从其他所有输入接收到barrier n为止。否则,它会混合属于快照n的记录和属于快照n + 1的记录;

如上图所示:

图1,算子收到数字流的Barrier,字母流对应的barrier尚未到达
图2,算子收到数字流的Barrier,会继续从数字流中接收数据,但这些流只能被搁置,记录不能被处理,而是放入缓存中,等待字母流 Barrier到达。在字母流到达前, 1,2,3数据已经被缓存。
图3,字母流到达,算子开始对齐State进行异步快照,并将Barrier向下游广播,并不等待快照执行完毕。
图4,算子做异步快照,首先处理缓存中积压数据,然后再从输入通道中获取数据。

什么是barrier不对齐


checkpoint 是要等到所有的barrier全部都到才算完成

上述图2中,当还有其他输入流的barrier还没有到达时,会把已到达的barrier之后的数据1、2、3搁置在缓冲区,等待其他流的barrier到达后才能处理

barrier不对齐:就是指当还有其他流的barrier还没到达时,为了不影响性能,也不用理会,直接处理barrier之后的数据。等到所有流的barrier的都到达后,就可以对该Operator做CheckPoint了;

为什么要进行barrier对齐?不对齐到底行不行?

答:Exactly Once时必须barrier对齐,如果barrier不对齐就变成了At Least Once;

CheckPoint的目的就是为了保存快照,如果不对齐,那么在chk-100快照之前,已经处理了一些chk-100 对应的offset之后的数据,当程序从chk-100恢复任务时,chk-100对应的offset之后的数据还会被处理一次,所以就出现了重复消费。

checkpoint 执行全过程


Checkpoint由JM的Checkpoint Coordinator发起

第一步,Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint;。

第二步,source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint。

第三步,当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator。

这里分为同步和异步(如果开启的话)两个阶段:

1.同步阶段:task执行状态快照,并写入外部存储系统(根据状态后端的选择不同有所区别)

执行快照的过程:

a.对state做深拷贝。

b.将写操作封装在异步的FutureTask中

FutureTask的作用包括:1)打开输入流2)写入状态的元数据信息3)写入状态4)关闭输入流

2.异步阶段:

1)执行同步阶段创建的FutureTask

2)向Checkpoint Coordinator发送ACK响应


第四步,下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。


同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。


最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。


checkpoint 常见失败原因和注意事项


下面是设置 Flink Checkpoint 参数配置的建议及注意点:

  1. 当 Checkpoint 时间比设置的 Checkpoint 间隔时间要长时,可以设置 Checkpoint 间最小时间间隔。这样在上次 Checkpoint 完成时,不会立马进行下一次 Checkpoint,而是会等待一个最小时间间隔,之后再进行 Checkpoint。否则,每次 Checkpoint 完成时,就会立马开始下一次 Checkpoint,系统会有很多资源消耗 Checkpoint 方面,而真正任务计算的资源就会变少。
  2. 如果Flink状态很大,在进行恢复时,需要从远程存储上读取状态进行恢复,如果状态文件过大,此时可能导致任务恢复很慢,大量的时间浪费在网络传输方面。此时可以设置 Flink Task 本地状态恢复,任务状态本地恢复默认没有开启,可以设置参数 state.backend.local-recovery
    值为 true
    进行激活。
  3. Checkpoint 保存数,Checkpoint 保存数默认是1,也就是只保存最新的 Checkpoint 的状态文件,当进行状态恢复时,如果最新的 Checkpoint 文件不可用时(比如 HDFS 文件所有副本都损坏或者其他原因),那么状态恢复就会失败,如果设置 Checkpoint 保存数 2,即使最新的Checkpoint恢复失败,那么Flink 会回滚到之前那一次 Checkpoint 的状态文件进行恢复。考虑到这种情况,用户可以增加 Checkpoint 保存数。
  4. 建议设置的 Checkpoint 的间隔时间最好大于 Checkpoint 的完成时间。

下图是不设置 Checkpoint 最小时间间隔示例图,可以看到,系统一致在进行 Checkpoint,大量的资源使用在 Flink Chekpoint 上,可能对运行的任务产生一定影响:

还有一种特殊的情况,Flink 端到端 Sink 的 EXACTLYONCE 的问题,也就是数据从 Flink 端到外部消息系统的消息一致性。打个比方,Flink 输出数据到 Kafka 消息系统中,如果使用 Kafka 0.10 的版本,Flink 不支持端到端的 EXACTLYONCE,可能存在消息重复输入到 Kafka。

如上图所示,当做 chk-5 Checkpoint 的时候,chk-5 失败,然后从 chk-4 来进行恢复,但是 chk-5 的部分数据在 Chekpoint 失败之前就有部分进入到 Kafka 消息系统,再次恢复时,该部分数据可能再次重放到 Kafka 消息系统中。

Flink 中解决端到端的一致性有两种方法:做幂等以及事务写,幂等的话,可以使用 KV 存储系统来做幂等,因为 KV 存储系统的多次操作结果都是相同的。Flink 内部目前支持二阶段事务提交,Kafka 0.11 以上版本支持事务写,所以支持 Flink 端到 Kafka 端的 EXACTLY_ONCE。

2 Checkpoint 异常情况排查
2.1 Checkpoint 失败

可以在 Checkpoint 界面看到如下图所示,下图中 Checkpoint 10423 失败了。

点击 Checkpoint 10423 的详情,我们可以看到类似下图所示的表格(下图中将 operator 名字截取掉了)。

上图中我们看到三行,表示三个 operator,其中每一列的含义分别如下:

  • 其中 Acknowledged
    一列表示有多少个 subtask 对这个 Checkpoint 进行了 ack,从图中我们可以知道第三个 operator 总共有 5 个 subtask,但是只有 4 个进行了 ack;
  • 第二列 Latest Acknowledgement
    表示该 operator 的所有 subtask 最后 ack 的时间;
  • End to End Duration
    表示整个 operator 的所有 subtask 中完成 snapshot 的最长时间;
  • State Size
    表示当前 Checkpoint 的 state 大小 -- 主要这里如果是增量 checkpoint 的话,则表示增量大小;
  • Buffered During Alignment
    表示在 barrier 对齐阶段积攒了多少数据,如果这个数据过大也间接表示对齐比较慢);

Checkpoint 失败大致分为两种情况:Checkpoint Decline 和 Checkpoint Expire。

 2.1.1 Checkpoint Decline

我们能从 jobmanager.log
中看到类似下面的日志Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.
其中10423 是 checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1
是 execution id,85d268e6fbc19411185f7e4868a44178
是 job id,我们可以在 jobmanager.log
中查找 execution id,找到被调度到哪个 taskmanager 上,类似如下所示:

2019-09-02 16:26:20,972 INFO  [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph        - XXXXXXXXXXX (100/289) (87b751b1fd90e32af55f02bb2f9a9892) switched from SCHEDULED to DEPLOYING.
2019-09-02 16:26:20,972 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE

从上面的日志我们知道该 execution 被调度到 hostnameABCDE
container_e24_1566836790522_8088_04_013155_1
slot 上,接下来我们就可以到 container  container_e24_1566836790522_8088_04_013155
的 taskmanager.log 中查找 Checkpoint 失败的具体原因了。

另外对于 Checkpoint Decline 的情况,有一种情况我们在这里单独抽取出来进行介绍:Checkpoint Cancel。

当前 Flink 中如果较小的 Checkpoint 还没有对齐的情况下,收到了更大的 Checkpoint,则会把较小的 Checkpoint 给取消掉。我们可以看到类似下面的日志:

$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

这个日志表示,当前 Checkpoint 19 还在对齐阶段,我们收到了 Checkpoint 20 的 barrier。然后会逐级通知到下游的 task checkpoint 19 被取消了,同时也会通知 JM 当前 Checkpoint 被 decline 掉了。

在下游 task 收到被 cancelBarrier 的时候,会打印类似如下的日志:

DEBUG
$taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment.

或者

DEBUG
$taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment.

或者

WARN
$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

上面三种日志都表示当前 task 接收到上游发送过来的 barrierCancel 消息,从而取消了对应的 Checkpoint。

 2.1.2 Checkpoint Expire

如果 Checkpoint 做的非常慢,超过了 timeout 还没有完成,则整个 Checkpoint 也会失败。当一个 Checkpoint 由于超时而失败是,会在 jobmanager.log
中看到如下的日志:

Checkpoint 1 of job 85d268e6fbc19411185f7e4868a44178  expired before completing.

表示 Chekpoint 1 由于超时而失败,这个时候可以可以看这个日志后面是否有类似下面的日志:

Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.

可以按照 2.1.1 中的方法找到对应的 taskmanager.log 查看具体信息。

下面的日志如果是 DEBUG 的话,我们会在开始处标记 DEBUG

我们按照下面的日志把 TM 端的 snapshot 分为三个阶段,开始做 snapshot 前,同步阶段,异步阶段:

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

这个日志表示 TM 端 barrier 对齐后,准备开始做 Checkpoint。

DEBUG
2019-08-06 13:43:02,613 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf, checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on Source: xxxxxx
_source -> Filter (27/70),5,Flink Task Threads] took 0 ms.

上面的日志表示当前这个 backend 的同步阶段完成,共使用了 0 ms。

DEBUG
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@7908affe, checkpointDirectory=xxxxxx, sharedStateDirectory=xxxxx, taskOwnedStateDirectory=xxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, asynchronous part) in thread Thread[pool-48-thread-14,5,Flink Task Threads] took 369 ms

上面的日志表示异步阶段完成,异步阶段使用了 369 ms

在现有的日志情况下,我们通过上面三个日志,定位 snapshot 是开始晚,同步阶段做的慢,还是异步阶段做的慢。然后再按照情况继续进一步排查问题。

2.2 Checkpoint 慢

在 2.1 节中,我们介绍了 Checkpoint 失败的排查思路,本节会分情况介绍 Checkpoint 慢的情况。

Checkpoint 慢的情况如下:比如 Checkpoint interval 1 分钟,超时 10 分钟,Checkpoint 经常需要做 9 分钟(我们希望 1 分钟左右就能够做完),而且我们预期 state size 不是非常大。

对于 Checkpoint 慢的情况,我们可以按照下面的顺序逐一检查

 2.2.0 Source Trigger Checkpoint 慢

这个一般发生较少,但是也有可能,因为 source 做 snapshot 并往下游发送 barrier 的时候,需要抢锁(这个现在社区正在进行用 mailBox 的方式替代当前抢锁的方式,详情参考 [1])。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。如果在 Source 所在的 taskmanager.log
中找不到开始做 Checkpoint 的 log,则可以考虑是否属于这种情况,可以通过 jstack
进行进一步确认锁的持有情况。

 2.2.1 使用增量 Checkpoint

现在 Flink 中 Checkpoint 有两种模式,全量 Checkpoint 和 增量 Checkpoint,其中全量 Checkpoint 会把当前的 state 全部备份一次到持久化存储,而增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上传的内容会相对更好,在速度上会有更大的优势。

现在 Flink 中仅在 RocksDBStateBackend 中支持增量 Checkpoint,如果你已经使用 RocksDBStateBackend,可以通过开启增量 Checkpoint 来加速,具体的可以参考 [2]。

 2.2.2 作业存在反压或者数据倾斜

我们知道 task 仅在接受到所有的 barrier 之后才会进行 snapshot,如果作业存在反压,或者有数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间,这两个可以通过如下的页面进行检查:

上图中我们选择了一个 task,查看所有 subtask 的反压情况,发现都是 high,表示反压情况严重,这种情况下会导致下游接收 barrier 比较晚。

上图中我们选择其中一个 operator,点击所有的 subtask,然后按照 Records Received/Bytes Received/TPS 从大到小进行排序,能看到前面几个 subtask 会比其他的 subtask 要处理的数据多。

如果存在反压或者数据倾斜的情况,我们需要首先解决反压或者数据倾斜问题之后,再查看 Checkpoint 的时间是否符合预期。

 2.2.2 Barrier 对齐慢

从前面我们知道 Checkpoint 在 task 端分为 barrier 对齐(收齐所有上游发送过来的 barrier),然后开始同步阶段,再做异步阶段。如果 barrier 一直对不齐的话,就不会开始做 snapshot。

barrier 对齐之后会有如下日志打印:

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

如果 taskmanager.log
中没有这个日志,则表示 barrier 一直没有对齐,接下来我们需要了解哪些上游的 barrier 没有发送下来,如果你使用 At Least Once 的话,可以观察下面的日志:

DEBUG
Received barrier for checkpoint 96508 from channel 5

表示该 task 收到了 channel 5 来的 barrier,然后看对应 Checkpoint,再查看还剩哪些上游的 barrier 没有接受到,对于 ExactlyOnce 暂时没有类似的日志,可以考虑自己添加,或者 jmap 查看。

 2.2.3 主线程太忙,导致没机会做 snapshot

在 task 端,所有的处理都是单线程的,数据处理和 barrier 处理都由主线程处理,如果主线程在处理太慢(比如使用 RocksDBBackend,state 操作慢导致整体处理慢),导致 barrier 处理的慢,也会影响整体 Checkpoint 的进度,在这一步我们需要能够查看某个 PID 对应 hotmethod,这里推荐两个方法:

  1. 多次连续 jstack,查看一直处于 RUNNABLE
    状态的线程有哪些;
  2. 使用工具 AsyncProfile dump 一份火焰图,查看占用 CPU 最多的栈;

如果有其他更方便的方法当然更好,也欢迎推荐。

 2.2.4 同步阶段做的慢

同步阶段一般不会太慢,但是如果我们通过日志发现同步阶段比较慢的话,对于非 RocksDBBackend 我们可以考虑查看是否开启了异步 snapshot,如果开启了异步 snapshot 还是慢,需要看整个 JVM 在干嘛,也可以使用前一节中的工具。对于 RocksDBBackend 来说,我们可以用 iostate
查看磁盘的压力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的时间总共开销多少。

RocksDB 开始 snapshot 的日志如下:

2019/09/10-14:22:55.734684 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] Started the snapshot process -- creating snapshot in directory /tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729

snapshot 结束的日志如下:

2019/09/10-14:22:56.001275 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] Snapshot DONE. All is good

 2.2.6 异步阶段做的慢

对于异步阶段来说,tm 端主要将 state 备份到持久化存储上,对于非 RocksDBBackend 来说,主要瓶颈来自于网络,这个阶段可以考虑观察网络的 metric,或者对应机器上能够观察到网络流量的情况(比如 iftop
)。

对于 RocksDB 来说,则需要从本地读取文件,写入到远程的持久化存储上,所以不仅需要考虑网络的瓶颈,还需要考虑本地磁盘的性能。另外对于 RocksDBBackend 来说,如果觉得网络流量不是瓶颈,但是上传比较慢的话,还可以尝试考虑开启多线程上传功能 [3]。








文章转载自大数据启示录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论