暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink SQL算子篇(四)普通连接与状态管理

大常哥的私房菜 2022-08-16
1555
大家好,我是大常哥,本次我们继续研究计算模型(3)的常用算子——普通连接操作。
读者也许知道在离线时,数据库查询引擎在进行连接可能会同时对两侧表的全量数据行查找操作,然后将元组连接形成结果集。然而,在Flink中,进行连接操作的两张动态表可能永远不会终结,那么该如何计算、如何输出,才能保证结果集与离线一致呢?这个问题的答案可以从源码中找到。
*** 一、连接操作的实现 ***
1.1 数据集与SQL案例

显然,连接操作需要定义左、右两张动态表,左表沿用上一篇的学生得分记录的score_info数据集:

name
score
Tom
12
John
15
Tom
18
Tom
19

右表采用学生信息的student_info数据集:

name
age
Tom
8
John
9

此次主要考察Innner Join和Left Join两种连接方式的输出行为,通过左表在name上连接右表得到带有学生信息的得分数据。然而,得分记录和学生信息的发送顺序是不定的。

    -- 1.inner join时的SQL代码
    select t1.name as name
    ,t2.age as age
    ,t1.score as score
    from
    (
    select name
    ,score
    from score_info
    ) as t1
    INNER JOIN
    (
    select name
    ,age
    from student_info
    ) as t2
         on t1.name = t2.name


    -- 2.left join时的SQL代码
    select t1.name as name
    ,t2.age as age
    ,t1.score as score
    from
    (
    select name
    ,score
    from score_info
    ) as t1
    LEFT JOIN
    (
    select name
    ,age
    from student_info
    ) as t2
    on t1.name = t2.name


    1.2 核心抽象的创建过程

    普通连接操作在Flink中对应的StreamExecNode的实现类为StreamExecJoin类,Transformation、Operator、Function的生成逻辑在其内部的translateToPlanInternal方法中。

      protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
      // 得到左、右表的两个输入边
      final ExecEdge leftInputEdge = getInputEdges().get(0);
      final ExecEdge rightInputEdge = getInputEdges().get(1);
      final Transformation<RowData> leftTransform =
      (Transformation<RowData>) leftInputEdge.translateToPlan(planner);
      final Transformation<RowData> rightTransform =
      (Transformation<RowData>) rightInputEdge.translateToPlan(planner);
      final RowType leftType = (RowType) leftInputEdge.getOutputType();
      final RowType rightType = (RowType) rightInputEdge.getOutputType();
      ...


      // 获取连接键
      final int[] leftJoinKey = joinSpec.getLeftKeys();
      final int[] rightJoinKey = joinSpec.getRightKeys();
      ...


      // 采用Code生成判断关联条件的Function源代码
      GeneratedJoinCondition generatedCondition =
      JoinUtil.generateConditionFunction(tableConfig, joinSpec, leftType, rightType);
      long minRetentionTime = tableConfig.getMinIdleStateRetentionTime();
      AbstractStreamingJoinOperator operator;


      // 获取连接类型
      FlinkJoinType joinType = joinSpec.getJoinType();


      // 根据连接类型生成不同的Operator
      if (joinType == FlinkJoinType.ANTI || joinType == FlinkJoinType.SEMI) {
      // Flink 1.13版本中并没有实现Anti与Semi连接,这里不再研究
      ...
      } else {
      // 生成内/外连接的Operator
      boolean leftIsOuter = joinType == FlinkJoinType.LEFT || joinType == FlinkJoinType.FULL;
      boolean rightIsOuter =
      joinType == FlinkJoinType.RIGHT || joinType == FlinkJoinType.FULL;
      operator =
      new StreamingJoinOperator(
      leftTypeInfo,
      rightTypeInfo,
      generatedCondition,
      leftInputSpec,
      rightInputSpec,
      leftIsOuter,
      rightIsOuter,
      joinSpec.getFilterNulls(),
      minRetentionTime);
      }
      final RowType returnType = (RowType) getOutputType();
      // 创建的Transformation对象类型是具有两个上游输入的TwoInputTransformation
      final TwoInputTransformation<RowData, RowData, RowData> transform =
      new TwoInputTransformation<>(
      leftTransform,
      rightTransform,
      getDescription(),
      operator,
      InternalTypeInfo.of(returnType),
      leftTransform.getParallelism());
      // set KeyType and Selector for state
      RowDataKeySelector leftSelect =
      KeySelectorUtil.getRowDataSelector(leftJoinKey, leftTypeInfo);
      RowDataKeySelector rightSelect =
      KeySelectorUtil.getRowDataSelector(rightJoinKey, rightTypeInfo);
      transform.setStateKeySelectors(leftSelect, rightSelect);
      transform.setStateKeyType(leftSelect.getProducedType());
      return transform;
      }

      浏览整个创建过程,可以提炼出以下三点信息:

      1.Join操作产生的执行图上有两个上游输入节点,对应连接的左、右两张表。

      2.CodeGen技术被用来生成验证关联条件成立的函数,因为在SQL代码中关联条件是用户自定义的。
      3.普通连接操作的实现过程被封装在StreamingJoinOperator中,将在下一节展开介绍。
      1.3 普通关联的实现
      由于普通连接操作有左、右两个动态表的输入,因此StreamingJoinOperator类内部也有两个方法processElement1和processElement2分别处理两侧的输入记录。
        // 处理左表输入记录
        public void processElement1(StreamRecord<RowData> element) throws Exception {
        processElement(element.getValue(), leftRecordStateView, rightRecordStateView, true);
        }


        // 处理右表输入记录
        public void processElement2(StreamRecord<RowData> element) throws Exception {
        processElement(element.getValue(), rightRecordStateView, leftRecordStateView, false);
        }
        代码显示无论是哪一侧的上游有输入,计算过程都是一致的,对应方法为StreamingJoinOperator#processElment,只是输入参数不同。方法的输入参数依次为输入记录、输入侧状态、对侧状态以及是否为左表,假设输入为左表记录,则需要依次输入左表记录状态、右表记录状态以及true标志。
          private void processElement(
          RowData input,
          JoinRecordStateView inputSideStateView,
          JoinRecordStateView otherSideStateView,
          boolean inputIsLeft)
          throws Exception {
          // 根据输入侧标识得到每一侧表是否为外连接。
          boolean inputIsOuter = inputIsLeft ? leftIsOuter : rightIsOuter;
          boolean otherIsOuter = inputIsLeft ? rightIsOuter : leftIsOuter;
          boolean isAccumulateMsg = RowDataUtil.isAccumulateMsg(input);


          ...
              // 获取对侧输入记录能被连接记录
          AssociatedRecords associatedRecords =
          AssociatedRecords.of(input, inputIsLeft, otherSideStateView, joinCondition);


          // 上游仅追加记录的处理过程
          if (isAccumulateMsg) {
          // 处理输入侧是外连接的情况
          if (inputIsOuter) {
          // 这里需要先将输入侧的记录状态转为外连接记录状态的类型
          OuterJoinRecordStateView inputSideOuterStateView =
          (OuterJoinRecordStateView) inputSideStateView;
                      // 如果对侧还没有能被连接的记录,则对侧先用null填充,并输出
          if (associatedRecords.isEmpty()) {
          outRow.setRowKind(RowKind.INSERT);
          outputNullPadding(input, inputIsLeft);
          inputSideOuterStateView.addRecord(input, 0);

                      // 对侧能被连接的情形
          } else {
                          // 如果对侧是外连接,并且还没有被连接到过,则需要先撤销之前输出采用null填充的结果。
          if (otherIsOuter) {
          OuterJoinRecordStateView otherSideOuterStateView =
          (OuterJoinRecordStateView) otherSideStateView;
          for (OuterRecord outerRecord : associatedRecords.getOuterRecords()) {
          RowData other = outerRecord.record;
          if (outerRecord.numOfAssociations == 0) {
          outRow.setRowKind(RowKind.DELETE);
          outputNullPadding(other, !inputIsLeft);
          }
          otherSideOuterStateView.updateNumOfAssociations(
          other, outerRecord.numOfAssociations + 1);
          }
          }


          // 然后输出关联到的结果集
          outRow.setRowKind(RowKind.INSERT);
          for (RowData other : associatedRecords.getRecords()) {
          output(input, other, inputIsLeft);
          }

          // 向输入侧记录状态添加当前输入记录以及关联到记录的数目
          inputSideOuterStateView.addRecord(input, associatedRecords.size());
          }


          // 输入侧不是外连接的处理过程
          } else {
          // 向输入侧记录状态添加当前输入记录
          inputSideStateView.addRecord(input);
          // 能被连接的记录非空才产生输出
          if (!associatedRecords.isEmpty()) {
          // 如果对侧是外连接,与之前的处理过程类似
          if (otherIsOuter) {
          OuterJoinRecordStateView otherSideOuterStateView =
          (OuterJoinRecordStateView) otherSideStateView;
          for (OuterRecord outerRecord : associatedRecords.getOuterRecords()) {
          if (outerRecord.numOfAssociations
          == 0) {
          outRow.setRowKind(RowKind.DELETE);
          outputNullPadding(outerRecord.record, !inputIsLeft);
          }
          otherSideOuterStateView.updateNumOfAssociations(
          outerRecord.record, outerRecord.numOfAssociations + 1);
          }
          outRow.setRowKind(RowKind.INSERT);
          } else {
          outRow.setRowKind(inputRowKind);
          }


          // 输出结果
          for (RowData other : associatedRecords.getRecords()) {
          output(input, other, inputIsLeft);
          }
          }
          }
          } else {
          // 不研究上游有撤回的情况
          ...
          }
          }
          普通连接的实现过程还是非常繁琐的,因为输入侧、输出侧都有内/外连接两种状态,每种状态都对应了不同的处理方法。整个连接过程的大致步骤如下:

          1.根据输入参数,判断输入侧、对侧的连接是否为外连接。

          2.获取侧能被连接的记录。

          3.连接两侧记录,输出结果。

          (1)如果输入侧为外连接,如果侧还没有能被连接的记录,则将侧用null填充后输出缺省结果,并将当前输入和关联到的记录数添加到状态;否则,需要先判断侧是否曾经输出过缺省结果,如果是则先撤回,然后输出当前记录的连接结果,并将当前输入和关联到的记录数添加到状态

          (2)如果输入侧为内连接,则能关联到侧记录才能输出结果,如果侧曾经输出过缺省结果,则与(1)中处理方式相同。

          可以总结输入侧和侧相同连接类型下的不同特性:

          1.输入侧为外连接时,如果连接不到侧的记录,会输出缺省值;而内连接不会进行输出。当能连接到时,二者会输出当前记录的连接结果。

          2.侧为外连接时,需要判断其是否曾经输出过缺省结果,如果是则需要进行撤回。
          3.对于Inner Join的场景,两侧都是内连接,计算过程中不会发生撤回,此时下游的输出模式也可以是Append流。
          在计算的过程中,撤回主要发生在对侧首次被连接到时,需要撤回之前的缺省结果。

          当左表输入(Tom,12)元组时,右表还没有数据,由于左表是外连接,因此采用null填充age字段,输出(Tom,null,12);当右表输入(Tom,8)后,引起了之前左表连接结果(Tom,null,12)的撤回,插入更新后的结果(Tom,8,12);而后左表输入(Tom,18),可以关联到右表(Tom,8)的记录,向下游插入结果(Tom,null,18);后面依次类推。

          当右表输入(Tom,8)元组时,该侧属于内连接,由于关联不到左表数据,所以不会产生任何输出;当左表输入(Tom,12)元组时,计算后向下游插入结果(Tom,8,12);后面依次类推。可以发现,Flink在计算连接时也是根据当前的输入关联对侧动态表已有的记录,递归计算当前输入记录的连接结果,这样结果集就与离线SQL一致。
          到这里,Flink SQL普通连接的计算过程已经基本介绍完了,只剩下一个问题还有待解决,就是如何从对侧状态中拿到可以连接的数据,连接键为Tom的得分会拿到John的信息么?这个问题将在下一节中详细讨论。
          *** 二、Flink的状态管理 ***

          本节将会通过普通连接中输入记录的存取来介绍Flink的状态管理。其实在前一篇介绍分组聚合操作的内容中,状态存储碍于篇幅限制并没有被深入探讨,其问题在于同一个Task会处理多个分组键或连接键的记录,那么当处理某一条记录时,Task如何获取到相同键的状态。

          2.1 连接中记录的获取
          在StreamingJoinOperator#processElment的源码中,对侧表可以被连接的数据是在状态中获取的。
            private void processElement(
            RowData input,
            JoinRecordStateView inputSideStateView,
            JoinRecordStateView otherSideStateView,
            boolean inputIsLeft)
                    throws Exception {


            ...
            // 获取对侧输入记录能被连接记录
            AssociatedRecords associatedRecords =
            AssociatedRecords.of(input, inputIsLeft, otherSideStateView, joinCondition);


            ...
            }
            具体的逻辑在AssociatedRecords#of方法中:
              public static AssociatedRecords of(
              RowData input,
              boolean inputIsLeft,
              JoinRecordStateView otherSideStateView,
              JoinCondition condition)
              throws Exception {


              List<OuterRecord> associations = new ArrayList<>();

              // 如果状态视图类型是外连接,则连接条件成立的同时还需要获得对侧输入是否被连接的次数
              if (otherSideStateView instanceof OuterJoinRecordStateView) {
              OuterJoinRecordStateView outerStateView =
              (OuterJoinRecordStateView) otherSideStateView;
              Iterable<Tuple2<RowData, Integer>> records =
              outerStateView.getRecordsAndNumOfAssociations();
              for (Tuple2<RowData, Integer> record : records) {
              boolean matched =
              inputIsLeft
              ? condition.apply(input, record.f0)
              : condition.apply(record.f0, input);
              if (matched) {
              associations.add(new OuterRecord(record.f0, record.f1));
              }
              }
              } else {
              Iterable<RowData> records = otherSideStateView.getRecords();
              for (RowData record : records) {
              boolean matched =
              inputIsLeft
              ? condition.apply(input, record)
              : condition.apply(record, input);
              if (matched) {
              // use -1 as the default number of associations
              associations.add(new OuterRecord(record, -1));
              }
              }
              }
              return new AssociatedRecords(associations);
              }
              由于外连接和内连接采用了不同的状态视图,这里挑选内连接的对侧状态视图为研究对象,记录获取的方法为JoinRecordStateView#getRecords。然而JoinRecordStateView是一个接口,案例SQL对应的实现类为InputSideHasNoUniqueKey类,其getRecords方法如下:
                public Iterable<RowData> getRecords() throws Exception {
                return new IterableIterator<RowData>() {


                // 从状态中获取迭代器对象
                private final Iterator<Map.Entry<RowData, Integer>> backingIterable =
                recordState.entries().iterator();

                ...
                };
                }
                这个方法稍显复杂,内部实现了一个迭代器,这并不是我们关心的内容,只需要知道对侧记录还是通过MapState类型的recordState获取就够了。
                  public interface MapState<UK, UV> extends State {


                      ...
                      
                  Iterable<Map.Entry<UK, UV>> entries() throws Exception;

                  ...
                  }
                  这里的问题是MapState类似于一个Map,获取状态时并没有传入类似当前连接键的参数,其entries方法会似乎会获得内部的保存的所有记录,那么输入侧连接键为Tom的记录是否会获取到对侧John的明细记录呢?要回答这个问题,需要知道MapState中的保存的是什么,拿到的又是什么,这就涉及到Flink的状态管理机制。
                  2.2 Flink状态管理
                  为了搞清楚连接时从MapState中拿到的到底是什么,需要研究下具体实现类的源码,案例SQL对应的实现类为HeapMapState,其entries方法如下。
                    public Iterable<Map.Entry<UK, UV>> entries() {
                        // 从状态表中获取当前状态的Map
                    Map<UK, UV> userMap = stateTable.get(currentNamespace);
                    return userMap == null ? Collections.emptySet() : userMap.entrySet();
                    }
                    真正的玄机就在stateTable中:
                      public S get(N namespace) {


                      return get(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
                      }


                      // 真正调用的方法
                      private S get(K key, int keyGroupIndex, N namespace) {


                      checkKeyNamespacePreconditions(key, namespace);


                      StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);

                      if (stateMap == null) {
                      return null;
                      }
                          // 根据键、命名空间获取键控状态
                      return stateMap.get(key, namespace);
                      }
                      stateTable的类型为StateTable类,这个类可以简单理解为以Map的形式保存了各种键的状态。所以拿到的是键的状态,在连接中几乎等同于连接键。直观上来说,每个键的MapState保存的内容类似下表。
                      key
                      value
                      Tom
                      (Tom,12)
                      Tom(Tom,18)
                      Tom(Tom,19)
                      在Flink SQL中计算所用到的状态许多都是键控状态,键可以是SQL中的分组字段、排序字段或者连接字段,以保证拿到的是相同字段值下的状态。也就是说,连接键为Tom的记录不会获取到John的明细记录。而在拿到对应的键控状态前,需要先设置当前处理记录的键,连接操作中是在StreamTwoInputProcessorFactory中进行的,该类持有了StreamingJoinOperator对象。
                        private static <T> void processRecord1(
                        StreamRecord<T> record, TwoInputStreamOperator<T, ?, ?> streamOperator)
                        throws Exception {
                        // 设置当前记录的键
                        streamOperator.setKeyContextElement1(record);
                        streamOperator.processElement1(record);
                        }


                        private static <T> void processRecord2(
                        StreamRecord<T> record, TwoInputStreamOperator<?, T, ?> streamOperator)
                        throws Exception {
                        // 设置当前记录的键
                        streamOperator.setKeyContextElement2(record);
                        streamOperator.processElement2(record);
                        }

                        当然,这些关于状态的内容只是Flink状态管理机制的冰山一角,但对于研究Flink SQL的计算过程已经够用了。

                        *** 三、总结 ***
                        本篇内容可以概括如下:
                        1.研究了Flink中普通连接的核心对象和计算过程。

                        2.探讨了输入侧和对侧在内、外连接的特性和输出行为,讨论了两侧不同输入顺序时的输出差异。

                        3.简要介绍了Flink SQL中使用的状态管理机制。

                        Flink SQL算子篇持续更新中,敬请期待!


                        文章转载自大常哥的私房菜,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论