
*** 一、表的输入输出模式 ***
这套体系在用到以离线数据集为运算基石的关系代数上是有问题的,当一条SQL的计算结果不仅依赖于当前的输入,还依赖于历史和未来的输入记录,那么当一条输入记录到来时,是否应当进行输出呢,如果输出,应当输出什么呢?
这个问题可以通过上一篇文章中的计算模型(2)和计算模型(3)进行回答,计算模型(2)是将这些数据进行分割,将无界数据集划分为有界数据集,只计算一段时间内的输入记录产生的结果,此时的语义和行为就可以与离线情况下一致;如果采用计算模型(3),要直接对无界数据集进行计算,那么,只有当数据流到尽头时的最后一次结果才与离线SQL一致,显然,Flink并不能只输出一次。那么,可以引入一个机制,对于每一次输入的计算结果,可以通过一个特殊符号将上次的输出结果作废,再输出新的结果,这样从这个输出流的角度来看,最终合并后的结果就与离线SQL一致了。

注意,这里的模型进行了简化,在实际的计算中,撤回不是必须的,而是在有必要修改状态时才进行撤回。引入了撤回机制,就可以在Flink SQL中实现计算模型(3)的操作。
引入撤回机制后,流的输入输出模式变得多样化。使用动态表时,支持的输入输出流的类型有:
(1)Append流:只支持插入模式的流。
(2)Changelog流:支持插入模式、更新模式和撤回模式的流。更新模式和撤回模式的区别是,更新模式在指定唯一键后仅用一条更新记录对下游进行更新,而撤回模式需要一条撤回记录和一条插入记录实现更新。由于这两种模式的效果相同,并且我们只研究当前操作的算子,后面不再区分撤回与更新,统一采用撤回称呼。
要说明的是,在旧版本的Flink-Kafka架构下,大多数实时数仓场景下只支持追加写入,实际应用中Changelog流还未被广泛使用。因此,对于上游输入,默认为Append流,我们只研究在此基础上的输出流的模式。
之所以在介绍分组聚合之前叙述Flink的运行机制和设计思想,归根结底是想说明一点,就是Flink在实现这个计算模型(3)的操作时,可能需要借助撤回机制,并且相对于研究选择与投影操作,我们需要关注更多的内容。
回忆对计算模型(1)的研究,这种模型对应的算子是无记忆的,计算时不需要借助之前的输入信息,研究时通过核心抽象的生成逻辑确定了实现类的层次和类型,而后通过对实现类处理逻辑的研究总结了选择与投影这类操作的行为。到了计算模型(3)时,需要额外关注对于状态的实现及其行为,比如状态的内容、读写过程等。
在进行分组聚合时,状态存储的内容有两种选择,第一种是直接保存历史输入的明细,总是可以根据明细计算出汇总结果,其缺点就是当历史明细的存储和计算相当耗费资源;第二种是保存汇总的结果,这隐含了SQL中的分组聚合总是可以以一种递归的形式实现的假设:

数据集依然之前的test表,有name和score两列:
| score | |
| Tom | 12 |
| John | 15 |
| Tom | 18 |
| Tom | 19 |
需要计算的是某个人的得分次数:
select name,count(1) as cntfrom testgroup by name
这个计算相当简单,但足够我们用来理解Flink SQL中分组汇总操作的实现了。
Flink SQL中,分组聚合操作对应的StreamExecNode为StreamExecGroupAggregate,其核心抽象生成逻辑在translateToPlanInternal方法中:
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {...// 获取上游输入的Transfomationfinal ExecEdge inputEdge = getInputEdges().get(0);final Transformation<RowData> inputTransform =(Transformation<RowData>) inputEdge.translateToPlan(planner);final RowType inputRowType = (RowType) inputEdge.getOutputType();// 创建聚合操作的Codegen生成器final AggsHandlerCodeGenerator generator =new AggsHandlerCodeGenerator(new CodeGeneratorContext(tableConfig),planner.getRelBuilder(),JavaScalaConversionUtil.toScala(inputRowType.getChildren()),true).needAccumulate();...// 将生成器转化为聚合操作的处理器final GeneratedAggsHandleFunction aggsHandler =generator.generateAggsHandler("GroupAggsHandler", aggInfoList);...// Operator类型为OneInputStreamOperator,// 并根据是否开启MiniBatch优化生成不同的汇// 总函数,由于采用了Codegen技术,这里的汇// 总函数实际上是Java源码。final OneInputStreamOperator<RowData, RowData> operator;if (isMiniBatchEnabled) {...} else {GroupAggFunction aggFunction =new GroupAggFunction(aggsHandler,recordEqualiser,accTypes,inputCountIndex,generateUpdateBefore,tableConfig.getIdleStateRetention().toMillis());operator = new KeyedProcessOperator<>(aggFunction);}// 根据算子创建Transformationfinal OneInputTransformation<RowData, RowData> transform =new OneInputTransformation<>(inputTransform,getDescription(),operator,InternalTypeInfo.of(getOutputType()),inputTransform.getParallelism());...return transform;}
这些核心抽象的生成逻辑比较简单,大致是按照Function、Operator和Transformation依次生成的,这也是SQL映射到Flink框架的经典构造过程。其中,Function的处理逻辑采用了CodeGen的技术,这也是将个性化的SQL汇总代码和通用逻辑进行了解耦。在生成Function和Operator时,会根据是否开启Mini-Batch而有两条构造支路,受限于篇幅限制,我们聚焦于流计算时的情况。
Transformation主要负责构造执行图,真正的执行入口在Operator中,即KeyedProcessOperator#processElement方法。
public void processElement(StreamRecord<IN> element) throws Exception {collector.setTimestamp(element);context.element = element;// 处理输入记录的具体方法// userFunction为GroupAggFunctionuserFunction.processElement(element.getValue(), context, collector);context.element = null;}
其会调用GroupAggFunction#processElement方法,分组汇总的执行逻辑基本都在这里:
public void processElement(RowData input, Context ctx, Collector<RowData> out)throws Exception {// 获得当前记录的分组键,以及之前的累加器RowData currentKey = ctx.getCurrentKey();boolean firstRow;RowData accumulators = accState.value();// 如果之前的累计结果为空,说明为当前输入记录为分组键的首行if (null == accumulators) {...firstRow = true;accumulators = function.createAccumulators();} else {firstRow = false;}// 设置汇总函数的累加器为累计状态,这个汇总函数就是Codegen生成的// Java汇总源码的实例化类function.setAccumulators(accumulators);// 获得汇总值RowData prevAggValue = function.getValue();// 这里会根据该输入是插入还是撤回做不同的处理if (isAccumulateMsg(input)) {// 在历史汇总值的基础上增量计算汇总结果function.accumulate(input);} else {// 我们不研究输入流有撤回的情形...}// 获得累计当前输入的汇总结果RowData newAggValue = function.getValue();// 获得当前累加器accumulators = function.getAccumulators();// 正常情形,输入计数器不为0,实际上是创建的累加器不为空if (!recordCounter.recordCountIsZero(accumulators)) {// 更新状态accState.update(accumulators);// 如果非首行,可能需要撤回if (!firstRow) {// 如果状态过期或者状态未发生改变,则不进行输出if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, newAggValue)) {return;} else {// 否则需要进行撤回if (generateUpdateBefore) {resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);out.collect(resultRow);}resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);}} else {// 首行记录直接插入汇总结果resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);}out.collect(resultRow);} else {// 异常的情况,记录计数器为0...}}
Flink SQL的分组汇总计算的逻辑还是条理清晰的,整个执行过程可以概括为:
1.获取之前的累加器。注意,代码中区分了累加器和汇总值,但是在非窗口汇总的情况下,累加器和汇总值基本上是一个概念。
3.将最新的汇总结果存入状态,如果计算前后状态不变,则直接返回;否则,先撤回原有的结果,再将新的结果插入下游。
分组汇总的执行过程体现了实现计算模型(3)的基本手段,先取出状态,再将输入记录与状态一起计算,撤回需要修改的历史结果,输出新的结果;也能看出在状态的存储形式上选择了递归,即保存当前输入前的汇总值,而非明细。其输出行为也与计算过程一致,当分组键“Tom”上的输入记录大于1条时,会不断进行撤回和插入操作,合并后的值与离线结果一致:
+I[Tom, 1]+I[John, 1]-U[Tom, 1]+U[Tom, 2]-U[Tom, 2]+U[Tom, 3]
本篇内容可以概括如下:
2.详细说明了计算模型(3)的性质,并拓展了其递归形式。
Flink SQL算子篇持续更新中,敬请期待!




