前言
本次故障反馈Flink消费Kafka性能不达标、数据积压达到几十亿量级。笔者尝试微调了Flink作业资源、重启之后出现神奇一幕:几十亿积压数据瞬间消费完了!!!惊呆了老铁,事出反常必有妖,详见正文吧。文章首发微信公众号:大数据从业者,其它均为转载,原创不易,欢迎您点赞关注推荐转发,感谢!

问题分析
查看FlinkUI发现已处理数据量并没有几十亿。难道重启作业offset策略为latest?丢数据了?经过多轮沟通确认offset策略都是提交到Kafka持久化的、作业没开启checkpoint。kafka-consumer-group.s持续观察数据确实没有积压,Flink作业和Kafka服务均未见异常日志,决定观察一段时间(PS:没搞清楚之前,可不敢调参重启了,万一真丢数据算谁的)。两天之后,出现同样的现象:数据积压严重、重启Flink作业、数据积压就消失了。强调一点,这两天入库数据量正常,也就是说不存在丢数据可能!
所谓积压就是current offset跟不上end offset,那么存在两种可能:一种是消费速度确实跟不上;另一种是消费速度跟得上,只是current offset没提交成功或者压根没提交。但是为啥重启动作会导致current offset瞬间跟上end offset呢?(PS:消费者也没跳过offset)。这里到底有什么猫腻?源码中寻找答案!
源码分析
消费者提交offset请求
消费者提交offset分为两种:commitOffsetsSync同步提交和commitOffsetsAsync异步提交,都对应于ConsumerCoordinator类。
同步提交发生于KafkaConsumer类close方法,调用ConsumerCoordinator类close方法,调用maybeAutoCommitOffsetsSync方法,调用commitOffsetsSync方法实现同步提交,如图所示:

commitOffsetsSync方法通过sendOffsetCommitRequest方法发送请求到服务端,如图所示:

而异步提交发生于KafkaConsumer类poll方法,调用updateAssignmentMetadataIfNeeded方法,调用ConsumerCoordibator类poll方法,调用maybeAutoCommitOffsetsAsync方法,调用doAutoCommitOffsetsAsync方法,调用commitOffsetsAsync方法实现异步提交,如果遇到可重试异常,定时休眠重试,如图所示:

同样,commitOffsetsAsync方法也是通过sendOffsetCommitRequest方法发送请求到服务端,如图所示:

最终,sendOffsetCommitRequest方法将offset发送给指定服务端coordinator,并设置回调类OffsetCommitResponseHandler用来解析服务端处理请求之后返回的结果,如图所示:

Server响应请求
进程启动入口KafkaServer类startup方法会实例化KafkaApis类和KafkaRequestHandlerPool类,如图所示:

KafkaApis类继承自ApiRequestHandler其实就是producer、consumer与broke之间各种请求的接收处理方法,而KafkaRequestHandlerPool是用于执行这些处理方法的线程池。KafkaApis将各种请求映射到对应的处理方法,包括但不限于PRODUCE(写)、FETCH(读)、OFFSET_COMMIT(提交offset)等,如图所示:

其中,handleOffsetCommitRequest就是服务端处理提交offset请求的方法,该方法继续调用groupCoordinator.handleCommitOffsets方法,继续调用doCommitOffsets方法,继续调用groupManager.storeOffsets方法,继续调用appendForGroup方法,继续调用replicaManager.appendRecords方法完成offset写入到__consumer_offset主题,并层层返回结果。handleOffsetCommitRequest最终返回给消费者端OffsetCommitResponse结果类,如图所示:

消费者解析请求结果
回调类OffsetCommitResponseHandler中handle方法对Server端返回的OffsetCommitResponse结果进行解析处理,如图所示:

注意关键日志,如果提交成功打印DEBUG日志:
log.debug("Committed offset {} for partition {}", offset, tp);
如果提交异常为RetriableException打印WARN日志:
log.warn("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
如果提交异常为其他打印ERROR日志:
log.error("Offset commit failed on partition {} at offset {}: {}", tp, offset, error.message());
继续看其他异常的处理,发现有标记Coordinator dead动作,如图所示:

至此,offset提交、处理、返回的关键流程分析完成!
分析小结
基于上述源码分析,现场搜索offset commit相关的WARN、ERROR级别的日志,反馈都没有。那问题基本定位在markCoordinatorUnknown(error)方法!如上图所示,条件有三个:COORDINATOR_NOT_AVAILABLE、NOT_COORDINATOR、REQUEST_TIMED_OUT,都继承自RetriableException,也就是说会重试。
故障现象只有一种解释:异步提交失败导致重试但是一直重试失败,而同步提交是成功的!打开ConsumerCoordinator类DEBUG级别日志印证想法,打开方式如下:
logger.consumer_coordinator_shaded.name = org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorlogger.consumer_coordinator_shaded.level = DEBUGlogger.consumer_coordinator_shaded.additivity = falselogger.consumer_coordinator_shaded.appenderRef.file.ref = MainAppender
具体效果可见实验室复现的情况!
问题复现
消费者offset提交到Kafka存储在__consumer_offset主题的单一分区,分区号的计算逻辑笔者从源码摘出来,执行结果如图所示:

说明:__consumer_offset分区数默认50,GroupID=f2,计算可知PartitionID=12。
通过如下命令可以验证上述算法结果正确性:
bin/kafka-console-consumer.sh \--topic __consumer_offsets \--bootstrap-server localhost:9092 \--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \--consumer.config config/consumer.properties \--partition 12 --from-beginning
方便复现,调小实验室Kafka配置offsets.commit.timeout.ms=1000,持续压测一段时间,成功复现,如图所示:

说明:offset提交超时之后,进入到重试,一直重复一直报错:The coordinator is not availabled,对应于COORDINATOR_NOT_AVAILABLE,属于RetriableException可重试异常。虽然一直重复报错与现场相同,但是现场没有offset提交超时异常,不过可以肯定的是根据现场日志显示,确实进入markCoordinatorUnknown方法,如图所示:

解决措施
问题明确:异步提交失败导致进入重试但是存在bug一直重试失败,而同步提交是成功的!那么,对比异步提交方法commitOffsetsAsync和同步提交方法commitOffsetsSync,不难发现,commitOffsetsAsync重试时缺少coordinator可用的处理逻辑。检索发现社区有个类似问题,影响版本为2.6.1, 2.7.2, 2.8.1, 3.0.0, 3.1.0,修复版本为3.2.1:

请自行参考修复,笔者Kafka版本修复代码如下:

详见github:
https://github.com/felixzh2020/kafka/commit/b33b8553f7a8d41740b0d0c973317dbf31b05fe0
基于修复代码重新打包kafka、Flink作业,验证效果如下:

如上图所示:遇到offset提交超时之后,寻找coordinator,重试提交offset成功!
结束
至此,原以为是性能瓶颈谁知道是这么个bug的问题处理完成!文章首发微信公众号:大数据从业者,其它均为转载,原创不易,欢迎您点赞关注推荐转发,感谢!





