暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink SQL算子篇(三)不简单的分组聚合

大常哥的私房菜 2022-07-30
1471
离线的数据查询时,分组聚合是常用的操作,可以计算离线数据集某些特定分组字段下聚合值的结果集。在Hive等基于批处理的数据计算引擎中,离线数据集的是一种历史的有界数据集,分组聚合等查询语句可以在全量历史数据上得到正确的结果;而Flink常常主要处理的是从确定的过去到不可预见的未来这条时间轴下无界的流数据。

那么这里就会有一些问题,当某个分组的数据还没到齐时,Flink如何计算汇总值?算子的行为与离线时又有什么区别呢

*** 一、表的输入输出模式 ***

在解答开篇的问题前,这里有必要先引入定义Flink动态表上流的模式,可以简单的理解为输入输出模式。在Flink中,从抽象角度的来看,动态表与流是相同的东西,可以称之为“流表二相性”,当需要用SQL或Table API运行应用程序时,对于用户而言,所看到的就是动态表。
1.1 计算模型

这套体系在用到以离线数据集为运算基石的关系代数上是有问题的,当一条SQL的计算结果不仅依赖于当前的输入,还依赖于历史和未来的输入记录,那么当一条输入记录到来时,是否应当进行输出呢,如果输出,应当输出什么呢?

这个问题可以通过上一篇文章中的计算模型(2)和计算模型(3)进行回答,计算模型(2)是将这些数据进行分割,将无界数据集划分为有界数据集,只计算一段时间内的输入记录产生的结果,此时的语义和行为就可以与离线情况下一致;如果采用计算模型(3),要直接对无界数据集进行计算,那么,只有当数据流到尽头时的最后一次结果才与离线SQL一致,显然,Flink并不能只输出一次。那么,可以引入一个机制,对于每一次输入的计算结果,可以通过一个特殊符号将上次的输出结果作废,再输出新的结果,这样从这个输出流的角度来看,最终合并后的结果就与离线SQL一致了。


注意,这里的模型进行了简化,在实际的计算中,撤回不是必须的,而是在有必要修改状态时才进行撤回。引入了撤回机制,就可以在Flink SQL中实现计算模型(3)的操作。

1.2 动态表的输出模式

引入撤回机制后,流的输入输出模式变得多样化。使用态表时,支持的输入输出流的类型有:

(1)Append流:只支持插入模式的流。

(2)Changelog流:支持插入模式、更新模式和撤回模式的流。更新模式和撤回模式的区别是,更新模式在指定唯一键后仅用一条更新记录对下游进行更新,而撤回模式需要一条撤回记录和一条插入记录实现更新。由于这两种模式的效果相同,并且我们只研究当前操作的算子,后面不再区分撤回与更新,统一采用撤回称呼。

要说明的是,旧版本的Flink-Kafka架构下,大多数实时数仓场景下只支持追加写入,实际应用中Changelog流还未被广泛使用。因此,对于上游输入,默认为Append流,我们只研究在此基础上的输出流的模式。

*** 二、分组聚合的实现 ***

之所以在介绍分组聚合之前叙述Flink的运行机制和设计思想,归根结底是想说明一点,就是Flink在实现这个计算模型(3)的操作时,可能需要借助撤回机制,并且相对于研究选择与投影操作,我们需要关注更多的内容。

2.1 分组聚合的递归形式

回忆对计算模型(1)的研究,这种模型对应的算子是无记忆的,计算时不需要借助之前的输入信息,研究时通过核心抽象的生成逻辑确定了实现类的层次和类型,而后通过对实现类处理逻辑的研究总结了选择与投影这类操作的行为。到了计算模型(3)时,需要额外关注对于状态的实现及其行为,比如状态的内容、读写过程等。

在进行分组聚合时,状态存储的内容有两种选择,第一种是直接保存历史输入的明细,总是可以根据明细计算出汇总结果,其缺点就是当历史明细的存储和计算相当耗费资源;第二种是保存汇总的结果,这隐含了SQL中的分组聚合总是可以以一种递归的形式实现的假设:

这样就可以只保存本次输入前历史的汇总值,可以大大减少状态的存储和计算量,其缺点在于,这种递归的改写并不是对所有的操作都有效,需要根据应用时算法的不同具体讨论。Flink SQL在进行分组聚合操作时采用了递归的形式。
2.2 数据集与SQL案例

数据集依然之前的test表,有name和score两列:

name
score
Tom
12
John
15
Tom
18
Tom
19

需要计算的是某个人的得分次数:

    select  name
    ,count(1) as cnt
    from test
    group by name

    这个计算相当简单,但足够我们用来理解Flink SQL中分组汇总操作的实现了。

    2.3 核心抽象的生成逻辑

    Flink SQL中,分组聚合操作对应的StreamExecNode为StreamExecGroupAggregate,其核心抽象生成逻辑在translateToPlanInternal方法中:

      protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {

      ...


      // 获取上游输入的Transfomation
      final ExecEdge inputEdge = getInputEdges().get(0);
      final Transformation<RowData> inputTransform =
      (Transformation<RowData>) inputEdge.translateToPlan(planner);
      final RowType inputRowType = (RowType) inputEdge.getOutputType();
      // 创建聚合操作的Codegen生成器
      final AggsHandlerCodeGenerator generator =
      new AggsHandlerCodeGenerator(
      new CodeGeneratorContext(tableConfig),
      planner.getRelBuilder(),
      JavaScalaConversionUtil.toScala(inputRowType.getChildren()),
      true)
      .needAccumulate();

      ...


      // 将生成器转化为聚合操作的处理器
      final GeneratedAggsHandleFunction aggsHandler =
      generator.generateAggsHandler("GroupAggsHandler", aggInfoList);

      ...

      // Operator类型为OneInputStreamOperator,
      // 并根据是否开启MiniBatch优化生成不同的汇
      // 总函数,由于采用了Codegen技术,这里的汇
          // 总函数实际上是Java源码。
      final OneInputStreamOperator<RowData, RowData> operator;
      if (isMiniBatchEnabled) {
      ...
      } else {
      GroupAggFunction aggFunction =
      new GroupAggFunction(
      aggsHandler,
      recordEqualiser,
      accTypes,
      inputCountIndex,
      generateUpdateBefore,
      tableConfig.getIdleStateRetention().toMillis());
      operator = new KeyedProcessOperator<>(aggFunction);
      }
      // 根据算子创建Transformation
      final OneInputTransformation<RowData, RowData> transform =
      new OneInputTransformation<>(
      inputTransform,
      getDescription(),
      operator,
      InternalTypeInfo.of(getOutputType()),
      inputTransform.getParallelism());


      ...

      return transform;
      }

      这些核心抽象的生成逻辑比较简单,大致是按照Function、Operator和Transformation依次生成的,这也是SQL映射到Flink框架的经典构造过程。其中,Function的处理逻辑采用了CodeGen的技术,这也是将个性化的SQL汇总代码和通用逻辑进行了解耦。在生成Function和Operator时,会根据是否开启Mini-Batch而有两条构造支路,受限于篇幅限制,我们聚焦于流计算时的情况。

      2.4 分组汇总的执行逻辑

      Transformation主要负责构造执行图,真正的执行入口在Operator中,即KeyedProcessOperator#processElement方法。

        public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
            // 处理输入记录的具体方法
            // userFunction为GroupAggFunction
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
        }

        其会调用GroupAggFunction#processElement方法,分组汇总的执行逻辑基本都在这里:

          public void processElement(RowData input, Context ctx, Collector<RowData> out)
          throws Exception {

          // 获得当前记录的分组键,以及之前的累加器
          RowData currentKey = ctx.getCurrentKey();
          boolean firstRow;
          RowData accumulators = accState.value();
          // 如果之前的累计结果为空,说明为当前输入记录为分组键的首行
          if (null == accumulators) {
          ...
          firstRow = true;
          accumulators = function.createAccumulators();
          } else {
          firstRow = false;
          }
          // 设置汇总函数的累加器为累计状态,这个汇总函数就是Codegen生成的
          // Java汇总源码的实例化类
          function.setAccumulators(accumulators);
          // 获得汇总值
          RowData prevAggValue = function.getValue();


          // 这里会根据该输入是插入还是撤回做不同的处理
              if (isAccumulateMsg(input)) {
                  // 在历史汇总值的基础上增量计算汇总结果
          function.accumulate(input);
          } else {
          // 我们不研究输入流有撤回的情形
          ...
          }


          // 获得累计当前输入的汇总结果
          RowData newAggValue = function.getValue();
          // 获得当前累加器
          accumulators = function.getAccumulators();

          // 正常情形,输入计数器不为0,实际上是创建的累加器不为空
          if (!recordCounter.recordCountIsZero(accumulators)) {
          // 更新状态
          accState.update(accumulators);
          // 如果非首行,可能需要撤回
          if (!firstRow) {
          // 如果状态过期或者状态未发生改变,则不进行输出
          if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, newAggValue)) {
          return;
          } else {
          // 否则需要进行撤回
          if (generateUpdateBefore) {
          resultRow
          .replace(currentKey, prevAggValue)
          .setRowKind(RowKind.UPDATE_BEFORE);
          out.collect(resultRow);
          }
          resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
          }
          } else {
          // 首行记录直接插入汇总结果
          resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
          }
          out.collect(resultRow);
          } else {
          // 异常的情况,记录计数器为0
          ...
          }
          }

          Flink SQL的分组汇总计算的逻辑还是条理清晰的,整个执行过程可以概括为:

          1.获取之前的累加器。注意,代码中区分了累加器和汇总值,但是在非窗口汇总的情况下,累加器和汇总值基本上是一个概念。

          2.将本次输入与累加器进行增量计算。

          3.将最新的汇总结果存入状态,如果计算前后状态不变,则直接返回;否则,先撤回原有的结果,再将新的结果插入下游。

          分组汇总的执行过程体现了实现计算模型(3)的基本手段,先取出状态,再将输入记录与状态一起计算,撤回需要修改的历史结果,输出新的结果;也能看出在状态的存储形式上选择了递归,即保存当前输入前的汇总值,而非明细。其输出行为也与计算过程一致,当分组键“Tom”上的输入记录大于1条时,会不断进行撤回和插入操作,合并后的值与离线结果一致:

            +I[Tom, 1]
            +I[John, 1]
            -U[Tom, 1]
            +U[Tom, 2]
            -U[Tom, 2]
            +U[Tom, 3]
            *** 三、总结 ***

            本篇内容可以概括如下:

            1.梳理了Flink SQL的输入输出模式,分析了撤回机制的动机

            2.详细说明了计算模型(3)的性质,并拓展了其递归形式。

            3.介绍了分组汇总算子的生成逻辑和执行过程,并展示了其输入输出行为。

            Flink SQL算子篇持续更新中,敬请期待!

            文章转载自大常哥的私房菜,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论