流数据如何在Flink 不同的TaskManager之间流动?这是任何一位想深入研究Flink内核的伙伴都绕不过去的逻辑,之前写过一篇文章《流数据如何在Flink新线程模型上流动》,主要通过Demo调式研究了Flink在local模式下的数据流动过程,其完全是在同一个TaskManager内,Inputchannel使用的都是LocalInputchannel,这次好奇下RemoteInputChannel和ResultPartition吧。消费者生产者模型应该都听说过,让我们一起分析BufferBuilder和BufferConsumer组合是如何完美的应用到Flink上,从而达到数据处理线程和发送线程的解耦。经过大量源码研究,笔者终于理清了实现逻辑,本文试图用一个最简单的图去描述它,如下图其流程逻辑清晰明了,建议读者结合源码去看加深印象。

ResultPartition由ResultsubPartition组成,ResultsubPartition是streaming和batch的抽象接口,分别对应PipelinedSubpartition和BoundBlockingSubpartition(其产生的是文件)。
UserFunction是我们在算子中自定义的数据处理函数。
上图序列化器是SpanningRecordSerializer;
反序列化器是SpillingAdaptiveSpanningRecordDeserializer;
BufferBuilder 是生产者,其主要是用于把经过用户自定义函数(UserFunction)处理,且序列化后的数据写入到Flink自己的内存管理模型MemorySegment上。
BufferConsumer是消费者。其主要是对BufferBuilder产生的数据进行包装,然后放到flink的LocalBufferPool里,这个BufferPool是堆外内存,Flink和Netty共用这个数据,可以直接发给其它TaskManager的NettyServer。
生产端:
recordwriter.emit函数触发写入流程,逐步调用channeSelectRecordWriter.requestNewBufferBuilder,申请BufferBuilder(创建bufferbuilder和bufferconsumer),最终调用resultPartition.addbufferConsumer分流批调用piplinedSubpartition.add(流调用)->notifyDataAvailable(本地调用localinputchannel) ->creditBasedSequenceNumberingViewReader.notifyDataAvailable(远程调用)-再通过netty调用PartitionResuqestQueue.usereventTriggered函数调用enqueueAvailableReader为availableReaders赋值。
当nettyt通道可写时,调用channelWritabilityChanged获取creditBasedSequenceNumberingViewReader.getNextBuffer获取buffer数据并将客户端的numCreditsAvailable执行减一运算返回实现流控,最终封装成BufferResponse发送给消费者。
注意:recordWriter会定义一个固定间隔执行的flush线程,用于通知PartitionResuqestQueue内的Reader,把bufferbuilder的数据定时发送。
消费端:
CreditBasedPartitionRequestClientHandler.channelRead调用decodMsg()->decodeBufferOrEvent()开始处理接收的数据,调用RemoteinputChannel.onBuffer放到ReceivedBuffers队列,然后进行credit处理或者通知notifyChannelNonEmpty(该接口是SingleInputGate.notifyChannelNonEmpty.)内部调用queueChannel调用inputchannelswithdata.notifyAll(),availabilityHelper.getUnavailableToResetAvailable()返回jointFuture,在streamtask的processInput函数可以看到,这个jointFuture被定义为当处于complete状态时调用DefaultActionSuspension.resume,会发送一个名为“resumedefault action”的空mail到mailbox,唤醒在mailbox.take函数处于等待的算子线程。
以上逻辑是个人调试源码的调用链,如有异议,欢迎交流学习。
感谢关注,谢谢!




