暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink SQL算子篇(二)聊聊选择与投影

大常哥的私房菜 2022-07-02
1531


在算子篇的开篇,我们明确要研究的Flink SQL三个核心抽象——转化、操作和函数,这些核心抽象适用于所有的Flink SQL算子。如果把这些算子的Transofrmation、Operator和Function每个字段和方法都梳理一遍,这不仅会分散我们的注意力,也会令文章失去主干。所以我们需要在某个维度上将算子进行聚类,某一类别的算子具有相似的行为,我们可以在这个模式内聚焦特定的内容。我这里所采用的模式划分的凭据是计算模型。

*** 一、算子的计算模型 ***

这里需要先说明,计算模型的内涵非常的广泛,一个数学公式或者像Flink官网中显示的一些计算流程都可以叫做计算模型。在本篇章中,计算模型指的是在某类特定算子的输入输出的数学模型,或者在关于信号的研究中,叫做响应模型。当然,模型不会特别的严格遵循数学的写法,也不会推导具体的响应模式,在Flink的应用中我们更加关注输入与输出之间的依赖关系。

大体而言,Flink SQL算子的计算模型可以分为三种,这里借鉴了一部分信号处理的研究思维。分别为:

这三种计算模型的区别是输出依赖之前输入序列的数据个数。我们先简单介绍一下每个计算模型需要被关注的内容,后续的章节中会进行更为详细的说明:
(1) 第一种计算模型输出只依赖于当前的输入,不依赖任何之前时刻的输入,这种计算显然不需要额外的存储结构,我们只需要关注计算逻辑,对应于系统的响应函数。
(2) 第二种计算模型输出依赖于当前时刻以及之前的有限的输入,显然这里需要额外的存储结构进行存储,除了计算逻辑,我们还应该关注Flink中的状态存储。该模型在实际使用时对应的应用为窗口计算,会在该公式上进行一定的变形,其结果的就是这段时间内的k个输入只有唯一的输出,因此我们还应该关注Flink对于时间的处理。
(3)第三种计算模型输出依赖于当前时刻以及之前的无限的输入,这需要某些存储结构,同时显然不能采用计算模型(2)的变形形式。幸运的是,在模型应用中,比如汇总计算时该式可以改写为递归的形式,可以用有限的存储空间。因此,我们在第三种计算模型中主要关注计算逻辑、存储结构以及可能的递归形式下的存储与计算。

所以,在本系列的章节中,我们会首先研究对应(1)、(3)计算模型的算子,这些算子都是非窗口计算的;然后再研究计算模型(2)对应的算子,也就是窗口计算相关算子。

*** 二、选择与投影 ***

根据数据库的理论,选择与投影是关系型数据库操作中是两个基本算符。选择常用σ表示指从数据库中按照某些条件过滤记录,从而减少结果集的行记录数;投影常用π表示,指筛选与派生出结果集所需要的字段。本节中我们将会看到Flink SQL是如何实现这两种操作的。

2.1 数据集与SQL案例

Flink SQL中的选择与投影,实际上都对应了流执行节点StreamExecCalc,也可能是本系列唯一的计算模型(1)对应的执行节点。在正式探究源码前,这里需要一个数据集以及围绕该数据集的SQL案例,以便在后续源码探究阶段能以一种更容易理解的形式进行说明。

数据集和案例代码,都相当简单,好处是我们不必花费心思理解SQL的含义,同时在Blink进行编译时不会引入我们不关注的模块和特性。这个简单的数据集在下面的表格中:

name
score
Tom
12
John
15
Tom
18
Tom
19
这个数据集是一些人实时的数据流,这些数据是按照时间顺序发生的。

案例SQL如下:

    select  name
    ,score+1
    from test
    where score>15

    代码显示Flink需要从这些实时分数信息中找到大于15分的数据,并且将得分加1,可以简单理解为“score>15”对应记录选择,“select name,score+1”对应字段投影。

    这里需要声明,碍于主旨和篇幅,笔者不会粘贴运行这些SQL的相关代码,也不会说明运行环境的信息(唯一需要声明的就是笔者采用了Flink 1.13.2版本以及Blink计划器),我们只需要关注这些算子相关源码就好。

    2.2 核心抽象的生成逻辑

    Flink SQL中的选择与投影对应计算模型(1),也就是说当前时刻的输出只与当前时刻的输入有关,我们只需要研究其计算逻辑就好。这些计算逻辑都被封装在了Flink的核心抽象中,在代码研究中,这些核心抽象的Java类型对于代码理解和SQL行为都有至关重要的影响,因此我们需要先关注这些核心抽象生成的位置,也就是执行节点的translateToPlan方法。

    选择与投影两种操作在Flink中对应同一个名为StreamExecCalc的执行节点,StreamExecCalc只有两个构造方法,其余的方法都继承自其父类CommonExecCalc中,因此我们考察CommonExecCalc#translateToPlan方法。

      protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
      final ExecEdge inputEdge = getInputEdges().get(0);
      // 取得执行图上的前驱 Transformation
      final Transformation<RowData> inputTransform =
      (Transformation<RowData>) inputEdge.translateToPlan(planner);
      // 得到 CodeGen 的上下文
      final CodeGeneratorContext ctx =
      new CodeGeneratorContext(planner.getTableConfig())
      .setOperatorBaseClass(operatorBaseClass);
      // 生成 CodeGenOperatorFactory 类型的 Operator 算子
      final CodeGenOperatorFactory<RowData> substituteStreamOperator =
      CalcCodeGenerator.generateCalcOperator(
      ctx,
      inputTransform,
      (RowType) getOutputType(),
      JavaScalaConversionUtil.toScala(projection),
      JavaScalaConversionUtil.toScala(Optional.ofNullable(this.condition)),
      retainHeader,
      getClass().getSimpleName());
      // 构造出 Transformation
      return new OneInputTransformation<>(
      inputTransform,
      getDescription(),
      substituteStreamOperator,
      InternalTypeInfo.of(getOutputType()),
      inputTransform.getParallelism());
      }

      从该方法的代码执行过程中我们可以窥见Flink SQL执行节点生成Transformation的一般范式:

      1. 获取上游的Transformation和其他执行信息,用于构造当前节点。

      2. 按顺序依次生成Function、Operator。
      3. 将上游Transformation和2中的Function、Operator为参数构造出当前节点的Tranformation。

      源码中与范式不同的是并没有显式的生成Function和Operator,而是创建了一个CodeGenOperatorFactory,即采用CodeGen技术的Operator工厂类。首先,从类型上来讲,该类与Operator是等价的,因为在OneInputTransformation中,Operator与OperatorFactory均可以参与构造,当采用Operator构造时也会采用SimpleOperatorFactory#of方法拿到工厂类,转而采用工厂类构造:
        public OneInputTransformation(
        Transformation<IN> input,
        String name,
        OneInputStreamOperator<IN, OUT> operator,
        TypeInformation<OUT> outputType,
        int parallelism
            // 调用Operator工厂类为参数的构造方法
        this(input, name, SimpleOperatorFactory.of(operator), outputType, parallelism);
        }
        public OneInputTransformation(
        Transformation<IN> input,
        String name,
        StreamOperatorFactory<OUT> operatorFactory,
        TypeInformation<OUT> outputType,
        int parallelism) {
        super(name, outputType, parallelism);
        this.input = input;
        this.operatorFactory = operatorFactory;
        }
        然后,从功能上来讲,CodeGen技术是一种Java源代码生成的技术,通过模板生成的形式翻译SQL当中的表达式,而不是Java的内部机制。这样做的好处是,减少Java类和方法的数量。举个例子,SQL语句:
          select a+b from test
          这条语句应该对应多少个Java方法呢?假设这个方法为add,具有两个输入参数,然而,可能出现的类型组合有很多,比如a为int、float、double、long等等,b和返回值同理。那么采用Java实现仅两个输入的求和就要方法重载很多次,显然采用Java语言机制实现特别麻烦。CodeGen技术就跳出了Java语言的机制,转而通过一种用语言生成源代码的方式解决这个问题。既然add方法的参数类型可能的组合数很多,那就消除可能性,等到参数确定时直接生成这个方法源代码就好了。那么只需要加法、乘法等少量模板就可以产生各种各样的源代码。所以我们可以看到CalcCodeGenerator#generateCalcOperator方法包含大量的文本解析过程,这里就不再过多赘述。后面,我们将会看到案例代码的Codegen生成类源代码。
          2.3 选择、投影的执行逻辑
          在聊完了选择与投影的执行类、执行环境的生成过程后,本节我们聚焦于这些SQL操作的Java语言实现。在上一篇文章中聊到过Flink的Transformation在执行时主要起到上下游连接的作用,相当于执行图,在任务运行时真正处理数据的功能封装在Operator中。在CommonExecCalc#translateToPlan中采用CodeGenOperatorFactory类取代,生成的Operator代码在其内部对象GeneratedClass的String类型的code变量中,在任务执行时Task会采用Janino进行编译后进行加载,成为真正的Operator对象。
            public class StreamExecCalc$11 extends org.apache.flink.table.runtime.operators.TableStreamOperator
            implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {
            ...
            }
            源码显示,CodeGen生成类是一个单输入的Operator,表明上游只有一个流,对于计算模型(1),我们主要关注Operator的processElement方法,该方法封装了SQL操作的核心执行逻辑。
              public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
                // 接收输入数据
              org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
                // 一次性创建后续需要用到的所有字段
              int field$4;
              boolean isNull$4;
              boolean isNull$5;
              boolean result$6;
              org.apache.flink.table.data.binary.BinaryStringData field$7;
              boolean isNull$7;
              org.apache.flink.table.data.binary.BinaryStringData field$9;
              boolean isNull$10;
              int result$11;
                // 获取score的值
              isNull$4 = in1.isNullAt(1);
              field$4 = -1;
              if (!isNull$4) {
              field$4 = in1.getInt(1);
              }
                // 判断score大于15
              isNull$5 = isNull$4 || false;
              result$6 = false;
                if (!isNull$5) {
                  result$6 = field$4 > ((int) 15);
              }
                // 如果score大于15,则输出name和score+1的结果记录
                if (result$6) {
              isNull$7 = in1.isNullAt(0);
              field$7 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
              if (!isNull$7) {
              field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
              }
              field$9 = field$7;
              if (!isNull$7) {
              field$9 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$8.copy(field$9));
                }
                out.setRowKind(in1.getRowKind());
              if (isNull$7) {
              out.setNullAt(0);
              } else {
              out.setNonPrimitiveValue(0, field$9);
                }
              isNull$10 = isNull$4 || false;
              result$11 = -1;
                if (!isNull$10) { 
                  result$11 = (int) (field$4 + ((int) 1));  
                }  
              if (isNull$10) {
              out.setNullAt(1);
              } else {
              out.setInt(1, result$11);
                }             
                output.collect(outElement.replace(out));
                }
              }
              由于通过模板生成的代码,因此源码中字段、方法和类名具有"$"符号,这里贴出了全部代码,并增加了注释方便阅读。生成的源码特别细节和繁琐,这是为了让模板可以在细粒度上灵活添加处理代码,以应对各种可能出现的计算场景。除去各种异常判断,正常情况下的执行代码只有以下几行:
                public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
                接收输入数据
                org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();
                  ...
                  // 获取score的值
                  field$4 = in1.getInt(1);
                  // 判断score大于15
                result$6 = field$4 > ((int) 15);
                  ...
                / 如果score大于15,则输出name和score+1的结果记录
                if (result$6) {
                ...
                    // 拿到并设置name的值
                field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
                    ...
                    field$9 = field$7;
                    ...
                    out.setNonPrimitiveValue(0, field$9);
                    ...
                    // 计算score+1并设置到输出结果中
                    result$11 = (int) (field$4 + ((int) 1));  
                    ...
                out.setInt(1, result$11);
                    ...
                    // 输出
                output.collect(outElement.replace(out));
                }
                }
                源码显示,在SQL中的选择与投影的执行过程是选择先行,当过滤条件成立时,才进行投影操作并进行输出;否则,不会产生任何输出。这与输出结果是相符的。
                  +I[John, 19]
                  +I[Tom, 19]
                  +I[Tom, 20]
                  到这里,Flink SQL中选择与投影算子的执行过程与执行逻辑就介绍完了。虽然只是一个普通的例子,但是仍具有代表性,这一类SQL都具有相似的处理过程,从输入到输出的行为上是统一的。
                  *** 三、总结 ***

                  本篇内容可以概括如下:

                  1.抽象了Flink SQL的三种计算模型及其响应模式。

                  2.在Flink SQL中选择与投影操作属于计算模型(1),其行为只与计算逻辑有关。

                  3.介绍了选择与投影对应的算子的生成逻辑和执行过程,并总结了输入输出行为。

                  笔者也会继续更新Flink SQL算子篇的后续内容,敬请期待!


                  文章转载自大常哥的私房菜,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论