在算子篇的开篇,我们明确要研究的Flink SQL三个核心抽象——转化、操作和函数,这些核心抽象适用于所有的Flink SQL算子。如果把这些算子的Transofrmation、Operator和Function每个字段和方法都梳理一遍,这不仅会分散我们的注意力,也会令文章失去主干。所以我们需要在某个维度上将算子进行聚类,某一类别的算子具有相似的行为,我们可以在这个模式内聚焦特定的内容。我这里所采用的模式划分的凭据是计算模型。
这里需要先说明,计算模型的内涵非常的广泛,一个数学公式或者像Flink官网中显示的一些计算流程都可以叫做计算模型。在本篇章中,计算模型指的是在某类特定算子的输入输出的数学模型,或者在关于信号的研究中,叫做响应模型。当然,模型不会特别的严格遵循数学的写法,也不会推导具体的响应模式,在Flink的应用中我们更加关注输入与输出之间的依赖关系。

所以,在本系列的章节中,我们会首先研究对应(1)、(3)计算模型的算子,这些算子都是非窗口计算的;然后再研究计算模型(2)对应的算子,也就是窗口计算相关算子。
根据数据库的理论,选择与投影是关系型数据库操作中是两个基本算符。选择常用σ表示,指从数据库中按照某些条件过滤记录,从而减少结果集的行记录数;投影常用π表示,指筛选与派生出结果集所需要的字段。本节中我们将会看到Flink SQL是如何实现这两种操作的。
Flink SQL中的选择与投影,实际上都对应了流执行节点StreamExecCalc,也可能是本系列唯一的计算模型(1)对应的执行节点。在正式探究源码前,这里需要一个数据集以及围绕该数据集的SQL案例,以便在后续源码探究阶段能以一种更容易理解的形式进行说明。
数据集和案例代码,都相当简单,好处是我们不必花费心思理解SQL的含义,同时在Blink进行编译时不会引入我们不关注的模块和特性。这个简单的数据集在下面的表格中:
| name | score |
| Tom | 12 |
| John | 15 |
| Tom | 18 |
| Tom | 19 |
案例SQL如下:
select name,score+1from testwhere score>15
代码显示Flink需要从这些实时分数信息中找到大于15分的数据,并且将得分加1,可以简单理解为“score>15”对应记录选择,“select name,score+1”对应字段投影。
这里需要声明,碍于主旨和篇幅,笔者不会粘贴运行这些SQL的相关代码,也不会说明运行环境的信息(唯一需要声明的就是笔者采用了Flink 1.13.2版本以及Blink计划器),我们只需要关注这些算子相关源码就好。
Flink SQL中的选择与投影对应计算模型(1),也就是说当前时刻的输出只与当前时刻的输入有关,我们只需要研究其计算逻辑就好。这些计算逻辑都被封装在了Flink的核心抽象中,在代码研究中,这些核心抽象的Java类型对于代码理解和SQL行为都有至关重要的影响,因此我们需要先关注这些核心抽象生成的位置,也就是执行节点的translateToPlan方法。
选择与投影两种操作在Flink中对应同一个名为StreamExecCalc的执行节点,StreamExecCalc只有两个构造方法,其余的方法都继承自其父类CommonExecCalc中,因此我们考察CommonExecCalc#translateToPlan方法。
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {final ExecEdge inputEdge = getInputEdges().get(0);// 取得执行图上的前驱 Transformationfinal Transformation<RowData> inputTransform =(Transformation<RowData>) inputEdge.translateToPlan(planner);// 得到 CodeGen 的上下文final CodeGeneratorContext ctx =new CodeGeneratorContext(planner.getTableConfig()).setOperatorBaseClass(operatorBaseClass);// 生成 CodeGenOperatorFactory 类型的 Operator 算子final CodeGenOperatorFactory<RowData> substituteStreamOperator =CalcCodeGenerator.generateCalcOperator(ctx,inputTransform,(RowType) getOutputType(),JavaScalaConversionUtil.toScala(projection),JavaScalaConversionUtil.toScala(Optional.ofNullable(this.condition)),retainHeader,getClass().getSimpleName());// 构造出 Transformationreturn new OneInputTransformation<>(inputTransform,getDescription(),substituteStreamOperator,InternalTypeInfo.of(getOutputType()),inputTransform.getParallelism());}
从该方法的代码执行过程中我们可以窥见Flink SQL执行节点生成Transformation的一般范式:
获取上游的Transformation和其他执行信息,用于构造当前节点。
按顺序依次生成Function、Operator。 将上游Transformation和2中的Function、Operator为参数构造出当前节点的Tranformation。
public OneInputTransformation(Transformation<IN> input,String name,OneInputStreamOperator<IN, OUT> operator,TypeInformation<OUT> outputType,int parallelism// 调用Operator工厂类为参数的构造方法this(input, name, SimpleOperatorFactory.of(operator), outputType, parallelism);}public OneInputTransformation(Transformation<IN> input,String name,StreamOperatorFactory<OUT> operatorFactory,TypeInformation<OUT> outputType,int parallelism) {super(name, outputType, parallelism);this.input = input;this.operatorFactory = operatorFactory;}
select a+b from test
public class StreamExecCalc$11 extends org.apache.flink.table.runtime.operators.TableStreamOperatorimplements org.apache.flink.streaming.api.operators.OneInputStreamOperator {...}
public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {// 接收输入数据org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();// 一次性创建后续需要用到的所有字段int field$4;boolean isNull$4;boolean isNull$5;boolean result$6;org.apache.flink.table.data.binary.BinaryStringData field$7;boolean isNull$7;org.apache.flink.table.data.binary.BinaryStringData field$9;boolean isNull$10;int result$11;// 获取score的值isNull$4 = in1.isNullAt(1);field$4 = -1;if (!isNull$4) {field$4 = in1.getInt(1);}// 判断score大于15isNull$5 = isNull$4 || false;result$6 = false;if (!isNull$5) {result$6 = field$4 > ((int) 15);}// 如果score大于15,则输出name和score+1的结果记录if (result$6) {isNull$7 = in1.isNullAt(0);field$7 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;if (!isNull$7) {field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));}field$9 = field$7;if (!isNull$7) {field$9 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$8.copy(field$9));}out.setRowKind(in1.getRowKind());if (isNull$7) {out.setNullAt(0);} else {out.setNonPrimitiveValue(0, field$9);}isNull$10 = isNull$4 || false;result$11 = -1;if (!isNull$10) {result$11 = (int) (field$4 + ((int) 1));}if (isNull$10) {out.setNullAt(1);} else {out.setInt(1, result$11);}output.collect(outElement.replace(out));}}
public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {接收输入数据org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();...// 获取score的值field$4 = in1.getInt(1);// 判断score大于15result$6 = field$4 > ((int) 15);.../ 如果score大于15,则输出name和score+1的结果记录if (result$6) {...// 拿到并设置name的值field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));...field$9 = field$7;...out.setNonPrimitiveValue(0, field$9);...// 计算score+1并设置到输出结果中result$11 = (int) (field$4 + ((int) 1));...out.setInt(1, result$11);...// 输出output.collect(outElement.replace(out));}}
+I[John, 19]+I[Tom, 19]+I[Tom, 20]
本篇内容可以概括如下:
2.在Flink SQL中选择与投影操作属于计算模型(1),其行为只与计算逻辑有关。
笔者也会继续更新Flink SQL算子篇的后续内容,敬请期待!




