暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink Table Api 完整源码解析→

大数据技能圈 2023-12-21
66

前面我们已经知道了如何去修改编译flink源码

源码剖析 | Flink源码修改编译保姆级教程-->

Streaming API 版 WordCount运行流程源码解析

万字长文 | 透过现象看本质 | 从WordCount 到Flink Streaming API 源码详细解读

今天我们接着来研究 Table Api 版 WordCount是如何运行的

我们先看代码:

    public class WordCountWzy {
    private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);


    public static void main(String[] args) throws Exception {


    LOGGER.warn(DATASET_DEPRECATION_INFO);


    final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);


    // 设置执行环境
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


    DataSet<String> text = null;


    text = WordCountData.getDefaultTextLineDataSet(env);


    DataSet<Tuple2<String, Integer>> counts =
    // 将行分成二元组,包含:(word,1)
    text.flatMap(new WordCount.Tokenizer())
    // 按元组字段“0”分组,并对元组字段“1”求和
    .groupBy(0)
    .sum(1);


    counts.print();


    }


    /**
    * 实现了string分词器,将句子拆分成单词,作为用户定义的FlatMapFunction。
    * 该函数接受一行(字符串),并将其拆分为“(word,1)”形式的多个对。({@code Tuple2<String, Integer>})。
    */
    public static final class Tokenizer
    implements FlatMapFunction<String, Tuple2<String, Integer>> {


    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    String[] tokens = value.toLowerCase().split("\\W+");
    for (String token : tokens) {
    if (token.length() > 0) {
    out.collect(new Tuple2<>(token, 1));
    }
    }
    }
    }
    }




    public class WordCountData {


    public static final String[] WORDS =
    new String[] {
    "To be, or not to be,--that is the question:--",
    "Whether 'tis nobler in the mind to suffer",
    "The slings and arrows of outrageous fortune",
    "Or to take arms against a sea of troubles,",
    "And by opposing end them?--To die,--to sleep,--",
    "No more; and by a sleep to say we end",
    "The heartache, and the thousand natural shocks",
    "That flesh is heir to,--'tis a consummation",
    "Devoutly to be wish'd. To die,--to sleep;--",
    "To sleep! perchance to dream:--ay, there's the rub;",
    "For in that sleep of death what dreams may come,",
    "When we have shuffled off this mortal coil,",
    "Must give us pause: there's the respect",
    "That makes calamity of so long life;",
    "For who would bear the whips and scorns of time,",
    "The oppressor's wrong, the proud man's contumely,",
    "The pangs of despis'd love, the law's delay,",
    "The insolence of office, and the spurns",
    "That patient merit of the unworthy takes,",
    "When he himself might his quietus make",
    "With a bare bodkin? who would these fardels bear,",
    "To grunt and sweat under a weary life,",
    "But that the dread of something after death,--",
    "The undiscover'd country, from whose bourn",
    "No traveller returns,--puzzles the will,",
    "And makes us rather bear those ills we have",
    "Than fly to others that we know not of?",
    "Thus conscience does make cowards of us all;",
    "And thus the native hue of resolution",
    "Is sicklied o'er with the pale cast of thought;",
    "And enterprises of great pith and moment,",
    "With this regard, their currents turn awry,",
    "And lose the name of action.--Soft you now!",
    "The fair Ophelia!--Nymph, in thy orisons",
    "Be all my sins remember'd."
    };


    public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
    return env.fromElements(WORDS);
    }
    }

    01

    设置运行环境

      // 设置执行环境
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      点击getExecutionEnvironment()

        /**
        * 创建一个执行环境,该环境代表了程序当前执行的上下文。如果程序是独立调用的,此方法返回一个本地执行环境,
        * 如 {@link #createLocalEnvironment()} 返回的那样。如果程序是从命令行客户端调用并提交到集群中,此方法返回该集群的执行环境。
        *
        * @return 程序执行上下文的执行环境。
        */
        public static ExecutionEnvironment getExecutionEnvironment() {
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
        .map(ExecutionEnvironmentFactory::createExecutionEnvironment)
        .orElseGet(ExecutionEnvironment::createLocalEnvironment);
        }
        代码逻辑如下:
        首先,使用 Utils.resolveFactory 方法试图从线程本地变量 threadLocalContextEnvironmentFactory 或全局变量 contextEnvironmentFactory 获取一个 ExecutionEnvironmentFactory 工厂对象。
        如果能够获取到 ExecutionEnvironmentFactory,则调用其 createExecutionEnvironment 方法创建一个 ExecutionEnvironment 对象。
        如果获取不到 ExecutionEnvironmentFactory,那么就直接调用 ExecutionEnvironment.createLocalEnvironment 方法创建一个本地的执行环境。
        这里的 “orElseGet” 操作是 Java 8 的 Optional 类的方法,用于在 Optional 对象为空时提供一个默认的行为。在这个例子中,如果无法从 threadLocalContextEnvironmentFactory 或 contextEnvironmentFactory 中获取到 ExecutionEnvironmentFactory,则默认创建一个本地的 ExecutionEnvironment。

        点击 createLocalEnvironment

          /**
          * 创建一个{@link LocalEnvironment}。本地执行环境将在创建环境的同一JVM中以多线程方式运行程序。
          * 本地环境的默认并行度是硬件上下文的数量(CPU核心/线程),除非通过{@link #setDefaultLocalParallelism(int)}指定了不同的值。
          *
          * @return A local execution environment.
          */
          public static LocalEnvironment createLocalEnvironment() {
          return createLocalEnvironment(defaultLocalDop);
          }
          这是一个静态方法,方法名为createLocalEnvironment,它不需要任何参数并返回一个LocalEnvironment类型的实例。在这个方法中,它调用了同名的另一个createLocalEnvironment方法,该方法接收一个参数,该参数是defaultLocalDop,这可能是一个默认的本地并行度。这种编程模式通常用于提供一个简单的默认选项,同时还允许更复杂的配置。这里的defaultLocalDop是硬件上下文的数量(CPU核心/线程)通过
          private static int defaultLocalDop = Runtime.getRuntime().availableProcessors(); 来获取

          点击 createLocalEnvironment(defaultLocalDop);

            /**
            * 创建一个{@link LocalEnvironment}。本地执行环境将在创建环境的同一JVM中以多线程方式运行程序。
            * 它将使用参数中指定的并行性。
            *
            * @param parallelism 本地环境的并行度。
            * @return 具有指定并行度的本地执行环境。
            */
            public static LocalEnvironment createLocalEnvironment(int parallelism) {
            return createLocalEnvironment(new Configuration(), parallelism);
            }
            这个方法是一个静态方法,名为createLocalEnvironment,它接收一个整型参数parallelism(并行度),然后返回一个LocalEnvironment类型的对象。
            在这个方法中,它调用了同名但参数不同的另一个createLocalEnvironment方法,这个方法接收两个参数:一个新创建的Configuration对象和传入的并行度parallelism。
            这个方法的目的是创建一个具有特定并行度的LocalEnvironment对象,其中并行度就是在执行任务时可以同时运行的线程数。这种编程模式允许用户创建一个具有自定义并行度的本地环境。

            点击 createLocalEnvironment(new Configuration(), parallelism);方法

              /**
              * 创建一个用于执行 Flink 作业的{@link LocalEnvironment}。
              *
              * @param configuration 启动带有 {@link LocalEnvironment}的
              * @param defaultParallelism 用{@link LocalEnvironment}来初始化
              * @return {@link LocalEnvironment}
              */
              private static LocalEnvironment createLocalEnvironment(
              Configuration configuration, int defaultParallelism) {
              final LocalEnvironment localEnvironment = new LocalEnvironment(configuration);


              if (defaultParallelism > 0) {
              localEnvironment.setParallelism(defaultParallelism);
              }


              return localEnvironment;
              }
              这是一个私有静态方法,名为createLocalEnvironment,它接收一个Configuration对象和一个整型参数defaultParallelism(默认并行度)。
              首先,它创建了一个新的LocalEnvironment对象,传入了Configuration对象。
              然后,如果defaultParallelism大于0,就会将这个值设置为LocalEnvironment对象的并行度。
              最后,返回这个LocalEnvironment对象。
              这个方法的主要作用是创建一个配置过并行度的LocalEnvironment对象,LocalEnvironment对象是Flink应用在本地环境中运行时的上下文。

              点击 new LocalEnvironment(configuration);

                /**
                * 创建一个新的本地环境,该环境使用给定的配置来配置其本地执行器。
                *
                * @param config 用于配置本地执行器的配置。
                */
                public LocalEnvironment(Configuration config) {
                super(validateAndGetConfiguration(config));
                }
                这是LocalEnvironment类的一个公开构造函数,接收一个Configuration类型的参数。
                在这个构造函数中,它调用了父类的构造函数,传入了validateAndGetConfiguration(config)方法的返回值。这个方法用于验证和获取配置信息。
                这个构造函数的主要作用是创建一个LocalEnvironment对象,并使用传入的配置信息初始化这个对象。

                点击 super(validateAndGetConfiguration(config));

                  /**
                  * 创建一个新的{@link ExecutionEnvironment},它将使用给定的{@link Configuration}来配置{@link PipelineExecutor}。
                  */
                  @PublicEvolving
                  public ExecutionEnvironment(final Configuration configuration) {
                  this(configuration, null);
                  }
                  这也是一个构造函数,它用于创建一个ExecutionEnvironment对象。这个构造函数接收一个Configuration对象作为参数。
                  在这个构造函数体内,调用了一个接收两个参数的构造函数this(configuration, null)。这意味着这个构造函数会使用传入的Configuration对象和一个null值来创建一个ExecutionEnvironment对象。

                  点击 this(configuration, null);

                    /**
                    * 创建一个新的{@link ExecutionEnvironment},它将使用给定的{@link Configuration}来配置{@link PipelineExecutor}。
                    *
                    * <p>此外,此构造函数允许指定用户代码的{@link ClassLoader}。
                    */
                    @PublicEvolving
                    public ExecutionEnvironment(
                    final Configuration configuration, final ClassLoader userClassloader) {
                    this(new DefaultExecutorServiceLoader(), configuration, userClassloader);
                    }
                    这是一个构造函数,用于创建一个 ExecutionEnvironment 对象。这个构造函数接收一个 Configuration 对象和一个 ClassLoader 对象作为参数。
                    在这个构造函数体内,调用了一个接收三个参数的构造函数 this(new DefaultExecutorServiceLoader(), configuration, userClassloader)。这意味着这个构造函数会使用一个新创建的 DefaultExecutorServiceLoader 对象,传入的 Configuration 对象,和传入的 ClassLoader 对象来创建一个 ExecutionEnvironment 对象。
                    DefaultExecutorServiceLoader 是 Flink 用来加载执行服务的默认实现,例如一个远程的 Flink 集群或者一个本地的执行环境。
                    Configuration 是 Flink 用来配置执行环境的类。比如,可以设置任务的并行度,是否使用 checkpoint 等。
                    ClassLoader 是 Java 用来加载类和资源的一个部分。在这里,userClassloader 是用户定义的类加载器,允许用户自定义如何加载类和资源。

                    点击 this(new DefaultExecutorServiceLoader(), configuration, userClassloader);

                       **
                      * 创建一个新的{@link ExecutionEnvironment},将使用给定的{@link Configuration}来配置{@link PipelineExecutor}。
                      *
                      * <p>此外,此构造函数还允许指定{@link PipelineExecutorServiceLoader}和用户代码的{@link ClassLoader}。
                      */
                      @PublicEvolving
                      public ExecutionEnvironment(
                      final PipelineExecutorServiceLoader executorServiceLoader,
                      final Configuration configuration,
                      final ClassLoader userClassloader) {
                      this.executorServiceLoader = checkNotNull(executorServiceLoader);
                      this.configuration = new Configuration(checkNotNull(configuration));
                      this.userClassloader =
                      userClassloader == null ? getClass().getClassLoader() : userClassloader;


                      一个作业或操作符的配置可以在以下地方指定:
                      i) 在操作符级别,例如使用SingleOutputStreamOperator.setParallelism()设置并行度。
                      ii) 通过编程方式,例如使用env.setRestartStrategy()方法
                      iii) 在这里传递的配置中
                      如果在多个地方指定,则优先级顺序就是上述的顺序。


                      鉴于此,可以在此处覆盖执行配置的默认值,因为所有其他方式都假设环境已经实例化,因此它们将覆盖在此处传递的值。
                      this.configure(this.configuration, this.userClassloader);
                      }
                      这是一个构造函数,用于创建一个 ExecutionEnvironment 对象。这个构造函数接收一个 PipelineExecutorServiceLoader 对象,一个 Configuration 对象,和一个 ClassLoader 对象作为参数。
                      在这个构造函数体内:
                      它首先使用 checkNotNull() 方法来确保传入的 executorServiceLoader 和 configuration 参数不为 null,如果为 null,那么将抛出 NullPointerException。
                      然后它将这两个参数赋值给 this.executorServiceLoader 和 this.configuration。对于 configuration 参数,它创建了一个新的 Configuration 对象,并将传入的 configuration 对象的内容复制到新的对象中。
                      对于 userClassloader 参数,如果它为 null,那么将使用当前类的类加载器;否则,使用传入的 userClassloader。
                      最后,它调用 this.configure() 方法来配置 ExecutionEnvironment 对象。
                      PipelineExecutorServiceLoader 是一个接口,用于加载和创建 PipelineExecutor。PipelineExecutor 是一个接口,它定义了如何执行一个数据流程序。
                      Configuration 是 Flink 用来配置执行环境的类。例如,可以设置任务的并行度,是否使用 checkpoint 等。
                      ClassLoader 是 Java 用来加载类和资源的一个部分。在这里,userClassloader 是用户定义的类加载器,允许用户自定义如何加载类和资源。
                      因此,这个构造函数允许用户在创建 ExecutionEnvironment 对象时,提供自己的执行服务加载器,配置和类加载器。
                      至此,环境部分创建完成了。

                      02

                      读取数据源

                        return env.fromElements(WORDS);

                        点击 env.fromElements(WORDS);

                          @SafeVarargs
                          public final <X> DataSource<X> fromElements(X... data) {
                          if (data == null) {
                          throw new IllegalArgumentException("The data must not be null.");
                          }
                          if (data.length == 0) {
                          throw new IllegalArgumentException("The number of elements must not be zero.");
                          }


                          TypeInformation<X> typeInfo;
                          try {
                          typeInfo = TypeExtractor.getForObject(data[0]);
                          } catch (Exception e) {
                          throw new RuntimeException(
                          "Could not create TypeInformation for type "
                          + data[0].getClass().getName()
                          + "; please specify the TypeInformation manually via "
                          + "ExecutionEnvironment#fromElements(Collection, TypeInformation)",
                          e);
                          }


                          return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
                          }
                          这是一个名为fromElements的公共方法,它采用可变参数X… data作为输入,返回一个DataSource。@SafeVarargs注解用于通知编译器,方法中的可变参数类型是安全的,不会导致堆污染。
                          该方法首先检查输入的数据是否为空,如果为空,则抛出IllegalArgumentException异常。接下来,如果输入数据的长度为零,也会抛出IllegalArgumentException异常。
                          然后,它尝试获取第一个数据元素的类型信息(TypeInformation),如果无法获取,则抛出RuntimeException,提示用户手动通过ExecutionEnvironment#fromElements(Collection, TypeInformation)指定类型信息。
                          最后,它会使用TypeInformation和数据创建一个新的DataSource,然后返回这个DataSource。这个方法主要用于从给定的数据元素创建一个新的DataSource。

                          点击 fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName()); 方法

                            private <X> DataSource<X> fromCollection(
                            Collection<X> data, TypeInformation<X> type, String callLocationName) {
                            CollectionInputFormat.checkCollection(data, type.getTypeClass());
                            return new DataSource<>(
                            this,
                            new CollectionInputFormat<>(data, type.createSerializer(config)),
                            type,
                            callLocationName);
                            }
                            这是一个名为fromCollection的私有方法,它接收一个类型为X的集合(Collection data)、一个类型信息(TypeInformation type)和一个调用位置名称(String callLocationName)作为参数,并返回一个DataSource。
                            首先,方法调用CollectionInputFormat.checkCollection来检查集合数据和类型是否匹配。
                            然后,它创建一个新的DataSource。DataSource的构造函数接收四个参数:当前环境(this)、一个CollectionInputFormat(它用提供的数据和序列化器创建)、类型信息和调用位置名称。

                            点击 new DataSource

                              /**
                              * 创建一个新的数据源。
                              *
                              * @param context 数据源执行的环境。
                              * @param inputFormat 数据源执行的输入格式。
                              * @param type 这种输入格式产生的元素类型。
                              */
                              public DataSource(
                              ExecutionEnvironment context,
                              InputFormat<OUT, ?> inputFormat,
                              TypeInformation<OUT> type,
                              String dataSourceLocationName) {
                              super(context, type);


                              this.dataSourceLocationName = dataSourceLocationName;


                              if (inputFormat == null) {
                              throw new IllegalArgumentException("The input format may not be null.");
                              }


                              this.inputFormat = inputFormat;


                              if (inputFormat instanceof NonParallelInput) {
                              this.parallelism = 1;
                              }
                              }
                              创建一个新的数据源(DataSource)的构造函数。
                              参数包括:
                              context:数据源执行的环境,类型是 ExecutionEnvironment。
                              inputFormat:数据源执行的输入格式,类型是 InputFormat,这是一个泛型接口,OUT 是输出数据的类型,? 表示任何类型的输入数据。
                              type:这种输入格式产生的元素类型,类型是 TypeInformation,这也是一个泛型接口,OUT 是输出数据的类型。
                              dataSourceLocationName:数据源的位置名,类型是 String。
                              在构造函数内部,首先调用了父类的构造函数,然后设置了数据源的位置名。
                              如果输入格式为 null,则抛出 IllegalArgumentException。
                              设置输入格式,并根据输入格式是否为 NonParallelInput 类型,决定并行度(parallelism)的值。如果是 NonParallelInput 类型,则并行度为 1,否则并行度默认为执行环境的默认并行度。

                              点击new CollectionInputFormat<>(data, type.createSerializer(config)),

                                public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer) {
                                if (dataSet == null) {
                                throw new NullPointerException();
                                }


                                this.serializer = serializer;


                                this.dataSet = dataSet;
                                }
                                这段代码是CollectionInputFormat类的构造函数,用于创建一个新的CollectionInputFormat对象。
                                参数包括:
                                dataSet:一个集合,包含了数据源的所有元素,类型是 Collection,其中T表示元素的类型。
                                serializer:用于序列化和反序列化元素的工具,类型是 TypeSerializer,其中T表示元素的类型。
                                在构造函数内部,首先检查dataSet是否为null。如果为null,就抛出NullPointerException。
                                然后,设置serializer和dataSet的值。
                                最终数据会从 Array类型 转成Collection 类型并返回

                                03

                                数据处理逻辑

                                  DataSet<Tuple2<String, Integer>> counts =
                                  // 将行分成二元组,包含:(word,1)
                                  text.flatMap(new WordCount.Tokenizer())
                                  // 按元组字段“0”分组,并对元组字段“1”求和
                                  .groupBy(0)
                                  .sum(1);

                                  04

                                  执行任务

                                    counts.print();

                                    点击print();

                                      /**
                                      * 将DataSet中的元素打印到调用print()方法的JVM的标准输出流{@link System#out}。
                                      * 对于在集群中执行的程序,此方法需要将DataSet的内容收集回客户端,然后在那里打印它。
                                      *
                                      * <p>每个元素写入的字符串由{@link Object#toString()}方法定义。
                                      *
                                      * <p>此方法会立即触发程序执行,类似于{@link #collect()}和{@link #count()}方法。
                                      *
                                      * @see #printToErr()
                                      * @see #printOnTaskManager(String)
                                      */
                                      public void print() throws Exception {
                                      List<T> elements = collect();
                                      for (T e : elements) {
                                      System.out.println(e);
                                      }
                                      }

                                      这是一个公共的print()方法,它可能会抛出异常。在这个方法中,首先调用collect()方法收集元素并将这些元素存储在一个名为elements的列表中。然后,使用for-each循环遍历这个列表,将每个元素e打印到控制台。

                                      点击 collect();

                                        /**
                                        * 这是一个便利的方法,用于将DataSet的元素获取为一个列表。由于DataSet可能包含大量的数据,因此应谨慎使用此方法。
                                        *
                                        * @return 一个包含DataSet元素的列表。
                                        */
                                        public List<T> collect() throws Exception {
                                        final String id = new AbstractID().toString();
                                        final TypeSerializer<T> serializer =
                                        getType().createSerializer(getExecutionEnvironment().getConfig());


                                        this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
                                        JobExecutionResult res = getExecutionEnvironment().execute();


                                        ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
                                        if (accResult != null) {
                                        try {
                                        return SerializedListAccumulator.deserializeList(accResult, serializer);
                                        } catch (ClassNotFoundException e) {
                                        throw new RuntimeException("Cannot find type class of collected data type.", e);
                                        } catch (IOException e) {
                                        throw new RuntimeException(
                                        "Serialization error while deserializing collected data", e);
                                        }
                                        } else {
                                        throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
                                        }
                                        }
                                        这段代码定义了一个名为collect的公共方法,它返回一个类型为T的列表。这个方法可能会抛出异常。
                                        在方法内部,首先创建了一个新的AbstractID的实例,并将其转换为字符串。
                                        然后,创建了一个TypeSerializer的实例,它用于序列化和反序列化类型为T的对象。TypeSerializer是通过调用getType方法获取的Type的createSerializer方法创建的,该方法需要一个参数,即通过调用getExecutionEnvironment方法获取的ExecutionEnvironment的配置。
                                        接下来,调用了this的output方法,该方法以一个名为"collect()"的CollectHelper实例作为参数。这个CollectHelper实例是通过使用id和serializer作为参数创建的。
                                        然后,执行了getExecutionEnvironment方法获取的ExecutionEnvironment的execute方法,其结果被保存在JobExecutionResult类型的变量res中。
                                        接下来,从res中获取累加器的结果,结果被保存在一个字节数组列表accResult中。
                                        如果accResult不为空,则尝试使用serializer反序列化accResult,并返回结果。如果在反序列化过程中发生ClassNotFoundException或IOException,则抛出RuntimeException。
                                        如果accResult为空,则抛出一个RuntimeException,表明collect()方法无法获取DataSet。
                                        这里需要注意的是这段代码
                                        this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");

                                        点击 output(new Utils.CollectHelper<>(id, serializer)).name("collect()");

                                          /**
                                          * 使用{@link OutputFormat}发出一个DataSet。这个方法为程序添加了一个 data sink.
                                          *程序可以有多个 data sinks. 可以同时有多个消费者 (data sinks or transformations).
                                          *
                                          * @param outputFormat 处理DataSet的OutputFormat。
                                          * @return 处理DataSet的DataSink。
                                          * @see OutputFormat
                                          * @see DataSink
                                          */
                                          public DataSink<T> output(OutputFormat<T> outputFormat) {
                                          Preconditions.checkNotNull(outputFormat);


                                          configure the type if needed
                                          if (outputFormat instanceof InputTypeConfigurable) {
                                          ((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig());
                                          }


                                          DataSink<T> sink = new DataSink<>(this, outputFormat, getType());
                                          this.context.registerDataSink(sink);
                                          return sink;
                                          }
                                          这是一个公开的函数,名为output,接受一个OutputFormat参数,用于设定输出格式。在函数体内,首先检查传入的outputFormat参数是否为空。
                                          如果outputFormat实现了InputTypeConfigurable接口,即它是可配置输入类型的,那么就通过setInputType方法为其设置输入类型。这里的输入类型是通过getType方法获取的,context.getConfig()则用来获取配置信息。
                                          接着,创建了一个DataSink对象sink。DataSink是一个数据汇聚点,它将从DataSet中获取数据,并按照指定的outputFormat进行处理。
                                          然后,通过context.registerDataSink(sink)将创建的sink注册到上下文中。
                                          最后,函数返回创建的sink对象。

                                          点击 this.context.registerDataSink(sink);

                                          从上述每个数据集对象的构造过程对应的代码可以发现,上一个数据集对象都会作为下一个数据集对象的字段被传递下来,最终在DataSink对象中就有了完整的计算逻辑。
                                          第二个需要注意的代码是
                                          JobExecutionResult res = getExecutionEnvironment().execute();
                                          可以看出collect方法中是调用了excute方法

                                          点击execute();

                                            /**
                                            * 触发程序执行。环境将执行所有导致"sink"操作的程序部分。例如,sink操作包括打印结果({@link DataSet#print()},
                                            * 写入结果(例如{@link DataSet#writeAsText(String)},{@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}
                                            * 或者使用{@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}创建的其他通用数据sink。
                                            *
                                            * <p>程序执行将被记录并显示,显示的名称将使用生成的默认名称。
                                            *
                                            * @return 作业执行的结果,包含经过的时间和累加器
                                            * @throws Exception Thrown, if the program executions fails.
                                            */
                                            public JobExecutionResult execute() throws Exception {
                                            return execute(getJobName());
                                            }

                                            点击 execute(getJobName());

                                              /**
                                              * 触发程序执行。环境将执行程序中所有导致“sink”操作的部分。例如,sink 操作包括打印结果({@link DataSet#print()}),
                                              * 写入结果(例如,{@link DataSet#writeAsText(String)},{@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)},
                                              * 或者使用 {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)} 创建的其他通用数据 sink。
                                              *
                                              * <p>程序执行将被记录并以给定的作业名称显示。
                                              *
                                              * @return The result of the job execution, containing elapsed time and accumulators.
                                              * @throws Exception Thrown, if the program executions fails.
                                              */
                                              public JobExecutionResult execute(String jobName) throws Exception {
                                              final JobClient jobClient = executeAsync(jobName);


                                              try {
                                              if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                                              lastJobExecutionResult = jobClient.getJobExecutionResult().get();
                                              } else {
                                              lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
                                              }


                                              jobListeners.forEach(
                                              jobListener -> jobListener.onJobExecuted(lastJobExecutionResult, null));


                                              } catch (Throwable t) {
                                              Throwable strippedException = ExceptionUtils.stripExecutionException(t);


                                              jobListeners.forEach(
                                              jobListener -> {
                                              jobListener.onJobExecuted(null, strippedException);
                                              });
                                              ExceptionUtils.rethrowException(strippedException);
                                              }


                                              return lastJobExecutionResult;
                                              }

                                              这是一个名为 execute 的公共方法,它接收一个 jobName 参数并抛出异常。首先,它通过调用 executeAsync(jobName) 创建一个 JobClient。

                                              点击 executeAsync(jobName)

                                                /**
                                                * 这个方法异步触发程序执行。环境将执行程序中所有导致“sink”操作的部分。“sink”操作例如打印结果({@link DataSet#print()}),
                                                * 写入结果(例如{@link DataSet#writeAsText(String)},{@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)})
                                                * 或者使用{@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}创建的其他通用数据sink。
                                                *
                                                * <p>程序执行将会被记录并显示出给定的作业名称。
                                                *
                                                * @return 一个{@link JobClient},在提交成功后,可以用来与提交的作业进行通信。
                                                * @throws Exception Thrown, if the program submission fails.
                                                */
                                                @PublicEvolving
                                                public JobClient executeAsync(String jobName) throws Exception {
                                                checkNotNull(
                                                configuration.get(DeploymentOptions.TARGET),
                                                "No execution.target specified in your configuration file.");


                                                final Plan plan = createProgramPlan(jobName);
                                                final PipelineExecutorFactory executorFactory =
                                                executorServiceLoader.getExecutorFactory(configuration);


                                                checkNotNull(
                                                executorFactory,
                                                "Cannot find compatible factory for specified execution.target (=%s)",
                                                configuration.get(DeploymentOptions.TARGET));


                                                CompletableFuture<JobClient> jobClientFuture =
                                                executorFactory
                                                .getExecutor(configuration)
                                                .execute(plan, configuration, userClassloader);


                                                try {
                                                JobClient jobClient = jobClientFuture.get();
                                                jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
                                                return jobClient;
                                                } catch (Throwable t) {
                                                jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
                                                ExceptionUtils.rethrow(t);


                                                // make javac happy, this code path will not be reached
                                                return null;
                                                }
                                                }
                                                这段代码定义了一个名为executeAsync的公有方法,该方法在异步执行作业时被调用。它首先检查配置文件中是否指定了执行目标,然后创建一个程序计划。接下来,它尝试找到一个与指定执行目标兼容的工厂。如果找到,它将使用该工厂创建一个执行器,并使用该执行器执行计划。执行器执行计划的结果将作为JobClient的CompletableFuture返回。最后,它将尝试获取JobClient,如果成功,它将对每个作业监听器调用onJobSubmitted方法,否则,它将抛出异常。如果抛出异常,它也会对每个作业监听器调用onJobSubmitted方法并重新抛出异常。
                                                这里需要注意的代码是
                                                final Plan plan = createProgramPlan(jobName);
                                                点击 createProgramPlan(jobName)
                                                  /**
                                                  * 创建程序的{@link Plan}。计划是所有数据源、数据接收器和操作以及它们如何交互的描述,作为一个可以使用{@link PipelineExecutor}执行的独立单元。
                                                  * 获取一个计划并使用执行器启动它是运行程序的另一种方式,只有当程序只包含分布式操作时才可能。这将自动启动一个新的执行阶段。
                                                  *
                                                  * @param jobName 附加到计划的名称(在日志和监控中显示)。
                                                  * @return The program's plan.
                                                  */
                                                  @Internal
                                                  public Plan createProgramPlan(String jobName) {
                                                  return createProgramPlan(jobName, true);
                                                  }
                                                  点击 createProgramPlan(jobName, true);
                                                    /**
                                                    * 创建程序的{@link Plan}。计划是所有数据源,数据接收器,操作以及它们如何交互的描述,作为可以用{@link PipelineExecutor}执行的独立单元。
                                                    * 获取计划并使用执行器启动它是运行程序的另一种方式,只有在程序仅由分布式操作组成时才可能。
                                                    *
                                                    * @param jobName 附加到计划的名称(在日志和监控中显示)。
                                                    * @param clearSinks 是否开始新的执行阶段。
                                                    * @return The program's plan.
                                                    */
                                                    @Internal
                                                    public Plan createProgramPlan(String jobName, boolean clearSinks) {
                                                    checkNotNull(jobName);


                                                    if (this.sinks.isEmpty()) {
                                                    if (wasExecuted) {
                                                    throw new RuntimeException(
                                                    "No new data sinks have been defined since the "
                                                    + "last execution. The last execution refers to the latest call to "
                                                    + "'execute()', 'count()', 'collect()', or 'print()'.");
                                                    } else {
                                                    throw new RuntimeException(
                                                    "No data sinks have been created yet. "
                                                    + "A program needs at least one sink that consumes data. "
                                                    + "Examples are writing the data set or printing it.");
                                                    }
                                                    }


                                                    final PlanGenerator generator =
                                                    new PlanGenerator(sinks, config, getParallelism(), cacheFile, jobName);
                                                    final Plan plan = generator.generate();


                                                    // clear all the sinks such that the next execution does not redo everything
                                                    if (clearSinks) {
                                                    this.sinks.clear();
                                                    wasExecuted = true;
                                                    }


                                                    return plan;
                                                    }
                                                    它是用来创建一个执行计划的。这个方法需要两个参数,一个是jobName,表示作业的名称,另一个是clearSinks,是一个布尔值,用来决定是否清除所有的数据接收器。
                                                    在方法的开始,它首先会检查jobName是否为空,如果为空则会抛出异常。然后,它会检查sinks(数据接收器)是否为空,如果为空并且已经执行过了,那么会抛出异常,因为在最后一次执行后没有定义新的数据接收器。如果还没有执行过,那么也会抛出异常,因为程序至少需要一个消费数据的接收器。
                                                    接下来,它会创建一个新的PlanGenerator对象,并使用sinks,config,并行度,缓存文件和jobName作为参数。然后,它会生成一个新的执行计划。
                                                    最后,如果clearSinks为真,那么它会清除所有的数据接收器,并将wasExecuted设置为真,这样下一次执行就不会重复执行相同的操作。
                                                    最终,这个方法返回生成的执行计划。
                                                    这里需要注意的代码是 final Plan plan = generator.generate();
                                                    点击 generator.generate();
                                                      public Plan generate() {
                                                      final Plan plan = createPlan();
                                                      registerGenericTypeInfoIfConfigured(plan);
                                                      registerCachedFiles(plan);


                                                      logTypeRegistrationDetails();
                                                      return plan;
                                                      }

                                                      点击 createPlan();

                                                        private Plan createPlan() {
                                                        final OperatorTranslation translator = new OperatorTranslation();
                                                        final Plan plan = translator.translateToPlan(sinks, jobName);


                                                        if (defaultParallelism > 0) {
                                                        plan.setDefaultParallelism(defaultParallelism);
                                                        }
                                                        plan.setExecutionConfig(config);
                                                        return plan;
                                                        }
                                                        这是一个名为createPlan的私有方法,用于创建一个执行计划。
                                                        首先,它创建了一个新的OperatorTranslation对象,这是一个转译器,用于将操作符转译为执行计划。
                                                        然后,它调用translateToPlan方法,传入sinks(数据接收器)和jobName(作业名称)作为参数,生成一个执行计划。
                                                        接着,它检查defaultParallelism(默认并行度)是否大于0,如果是,则将该值设置为执行计划的默认并行度。
                                                        然后,它将config(执行配置)设置为执行计划的配置。
                                                        最后,这个方法返回生成的执行计划。
                                                        这里需要注意的代码是 final Plan plan = translator.translateToPlan(sinks, jobName);
                                                        可以看到sinks 和 jobname 被生成了Plan
                                                        点击 translateToPlan(sinks, jobName);
                                                          public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
                                                          List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();


                                                          for (DataSink<?> sink : sinks) {
                                                          planSinks.add(translate(sink));
                                                          }


                                                          Plan p = new Plan(planSinks);
                                                          p.setJobName(jobName);
                                                          return p;
                                                          }
                                                          点击 new Plan(planSinks);
                                                            /**
                                                            * 创建一个新的程序计划,描述以给定数据接收器结束的数据流。作业的显示名称使用时间戳生成。
                                                            *
                                                            * <p>如果数据流的所有接收器都没有给出,那么可能不会完全翻译整个流程,而只翻译从给定的数据接收器反向遍历可达的流程的部分。
                                                            *
                                                            * @param sinks 这个集合将成为数据流的接收器。
                                                            */
                                                            public Plan(Collection<? extends GenericDataSinkBase<?>> sinks) {
                                                            this(sinks, ExecutionConfig.PARALLELISM_DEFAULT);
                                                            }
                                                            点击 this(sinks, ExecutionConfig.PARALLELISM_DEFAULT);
                                                              /**
                                                              * 创建一个具有给定名称和默认并行性的新程序计划,描述以给定数据接收器结束的数据流。
                                                              *
                                                              * <p>如果数据流的所有接收器都没有给到计划,那么整个流可能无法完全被转换。
                                                              *
                                                              * @param sinks 这个集合将会是工作的数据流的接收器。
                                                              * @param jobName 显示的工作名称。
                                                              * @param defaultParallelism 该任务的默认并行性。
                                                              */
                                                              public Plan(
                                                              Collection<? extends GenericDataSinkBase<?>> sinks,
                                                              String jobName,
                                                              int defaultParallelism) {
                                                              this.sinks.addAll(sinks);
                                                              this.jobName = jobName;
                                                              this.defaultParallelism = defaultParallelism;
                                                              }
                                                              这是一个构造函数,用于创建一个名为Plan的对象。
                                                              这个构造函数接收三个参数:
                                                              sinks:这是一个包含GenericDataSinkBase对象的集合。GenericDataSinkBase是一个基础类,通常用于定义如何将数据输出到外部存储系统。
                                                              jobName:这是一个字符串,表示作业的名称。
                                                              defaultParallelism:这是一个整数,表示作业的默认并行性。并行性是指在执行作业时,可以同时进行处理的任务的数量。
                                                              在构造函数的主体中,我们将sinks参数添加到this.sinks集合中(this.sinks是Plan对象的一个属性),然后分别将jobName和defaultParallelism参数赋值给this.jobName和this.defaultParallelism(这也是Plan对象的属性)。
                                                              这里需要注意代码 this.sinks.addAll(sinks);
                                                              至此,Sinks 转 Plan就完成了。
                                                              回到 executeAsync方法
                                                              注意代码 
                                                                CompletableFuture jobClientFuture =
                                                                executorFactory
                                                                .getExecutor(configuration)
                                                                .execute(plan, configuration, userClassloader);
                                                                点击 .execute(plan, configuration, userClassloader);
                                                                  public interface PipelineExecutor {


                                                                  /**
                                                                  * 根据提供的配置执行一个{@link Pipeline},并返回一个{@link JobClient},它允许与正在执行的作业进行交互,例如取消它或获取一个保存点。
                                                                  *
                                                                  * 调用者负责管理返回值的生命周期
                                                                  * {@link JobClient}。这意味着,例如{@code close()}应该在调用处显式调用。
                                                                  *
                                                                  * @param pipeline 要执行的{@link管道}
                                                                  * @param configuration the {@link配置}和必需的执行参数
                                                                  * @param userCodeClassloader 用来反序列化usercode的{@link类加载器}
                                                                  * @return a {@link CompletableFuture} {@link JobClient}对应管道。
                                                                  */
                                                                  CompletableFuture<JobClient> execute(
                                                                  final Pipeline pipeline,
                                                                  final Configuration configuration,
                                                                  final ClassLoader userCodeClassloader)
                                                                  throws Exception;
                                                                  }
                                                                  可以看到Plan被转换成了Pipeline
                                                                  查看实现类 LocalExecutor的excute()方法
                                                                    public CompletableFuture<JobClient> execute(
                                                                    Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
                                                                    throws Exception {
                                                                    checkNotNull(pipeline);
                                                                    checkNotNull(configuration);


                                                                    Configuration effectiveConfig = new Configuration();
                                                                    effectiveConfig.addAll(this.configuration);
                                                                    effectiveConfig.addAll(configuration);


                                                                    // we only support attached execution with the local executor.
                                                                    checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));


                                                                    final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);


                                                                    return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
                                                                    .submitJob(jobGraph, userCodeClassloader);
                                                                    }
                                                                    注意代码 
                                                                      final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);
                                                                      点击 getJobGraph(pipeline, effectiveConfig, userCodeClassloader);
                                                                        private JobGraph getJobGraph(
                                                                        Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
                                                                        throws MalformedURLException {
                                                                        // This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
                                                                        // to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
                                                                        // for now.
                                                                        if (pipeline instanceof Plan) {
                                                                        Plan plan = (Plan) pipeline;
                                                                        final int slotsPerTaskManager =
                                                                        configuration.getInteger(
                                                                        TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
                                                                        final int numTaskManagers =
                                                                        configuration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);


                                                                        plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
                                                                        }


                                                                        return PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
                                                                        }
                                                                        点击 getJobGraph(pipeline, configuration, userCodeClassloader);
                                                                          /**
                                                                          * 创建对应于提供的 {@link Pipeline} 的 {@link JobGraph}。
                                                                          *
                                                                          * @param pipeline 我们正在计算其作业图的管道。
                                                                          * @param configuration 具有必要信息的配置,例如要包含的jar和类路径,作业的并行性以及可能用于引导其状态的保存点设置。
                                                                          * @param userClassloader the classloader which can load user classes.
                                                                          * @return the corresponding {@link JobGraph}.
                                                                          */
                                                                          public static JobGraph getJobGraph(
                                                                          @Nonnull final Pipeline pipeline,
                                                                          @Nonnull final Configuration configuration,
                                                                          @Nonnull ClassLoader userClassloader)
                                                                          throws MalformedURLException {
                                                                          checkNotNull(pipeline);
                                                                          checkNotNull(configuration);


                                                                          final ExecutionConfigAccessor executionConfigAccessor =
                                                                          ExecutionConfigAccessor.fromConfiguration(configuration);
                                                                          final JobGraph jobGraph =
                                                                          FlinkPipelineTranslationUtil.getJobGraph(
                                                                          userClassloader,
                                                                          pipeline,
                                                                          configuration,
                                                                          executionConfigAccessor.getParallelism());


                                                                          configuration
                                                                          .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
                                                                          .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));


                                                                          if (configuration.getBoolean(DeploymentOptions.ATTACHED)
                                                                          && configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
                                                                          jobGraph.setInitialClientHeartbeatTimeout(
                                                                          configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
                                                                          }


                                                                          jobGraph.addJars(executionConfigAccessor.getJars());
                                                                          jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
                                                                          jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());


                                                                          return jobGraph;
                                                                          }
                                                                          点击 
                                                                            .getJobGraph(
                                                                            userClassloader,
                                                                            pipeline,
                                                                            configuration,
                                                                            executionConfigAccessor.getParallelism());
                                                                              public static JobGraph getJobGraph(
                                                                              ClassLoader userClassloader,
                                                                              Pipeline pipeline,
                                                                              Configuration optimizerConfiguration,
                                                                              int defaultParallelism) {


                                                                              FlinkPipelineTranslator pipelineTranslator =
                                                                              getPipelineTranslator(userClassloader, pipeline);


                                                                              return pipelineTranslator.translateToJobGraph(
                                                                              pipeline, optimizerConfiguration, defaultParallelism);
                                                                              }


                                                                              点击 .translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism);
                                                                                public JobGraph translateToJobGraph(
                                                                                Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
                                                                                checkArgument(pipeline instanceof Plan, "Given pipeline is not a DataSet Plan.");


                                                                                Plan plan = (Plan) pipeline;
                                                                                setDefaultParallelism(plan, defaultParallelism);
                                                                                return compilePlan(plan, optimizerConfiguration);
                                                                                }

                                                                                查看实现类,点击 compilePlan(plan, optimizerConfiguration);

                                                                                  private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
                                                                                  Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
                                                                                  OptimizedPlan optimizedPlan = optimizer.compile(plan);


                                                                                  JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
                                                                                  return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
                                                                                  }

                                                                                  点击 compileJobGraph(optimizedPlan, plan.getJobId());

                                                                                    public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {
                                                                                    if (program == null) {
                                                                                    throw new NullPointerException(
                                                                                    "Program is null, did you called " + "ExecutionEnvironment.execute()");
                                                                                    }


                                                                                    if (jobId == null) {
                                                                                    jobId = JobID.generate();
                                                                                    }


                                                                                    this.vertices = new HashMap<PlanNode, JobVertex>();
                                                                                    this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
                                                                                    this.chainedTasksInSequence = new ArrayList<TaskInChain>();
                                                                                    this.auxVertices = new ArrayList<JobVertex>();
                                                                                    this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();
                                                                                    this.iterationStack = new ArrayList<IterationPlanNode>();


                                                                                    this.sharingGroup = new SlotSharingGroup();


                                                                                    ExecutionConfig executionConfig = program.getOriginalPlan().getExecutionConfig();


                                                                                    // this starts the traversal that generates the job graph
                                                                                    program.accept(this);


                                                                                    // sanity check that we are not somehow in an iteration at the end
                                                                                    if (this.currentIteration != null) {
                                                                                    throw new CompilerException(
                                                                                    "The graph translation ended prematurely, leaving an unclosed iteration.");
                                                                                    }


                                                                                    // finalize the iterations
                                                                                    for (IterationDescriptor iteration : this.iterations.values()) {
                                                                                    if (iteration.getIterationNode() instanceof BulkIterationPlanNode) {
                                                                                    finalizeBulkIteration(iteration);
                                                                                    } else if (iteration.getIterationNode() instanceof WorksetIterationPlanNode) {
                                                                                    finalizeWorksetIteration(iteration);
                                                                                    } else {
                                                                                    throw new CompilerException();
                                                                                    }
                                                                                    }


                                                                                    // now that the traversal is done, we have the chained tasks write their configs into their
                                                                                    // parents' configurations
                                                                                    for (TaskInChain tic : this.chainedTasksInSequence) {
                                                                                    TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration());
                                                                                    t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());
                                                                                    }


                                                                                    // ----- attach the additional info to the job vertices, for display in the runtime monitor


                                                                                    attachOperatorNamesAndDescriptions();


                                                                                    // ----------- finalize the job graph -----------


                                                                                    for (JobVertex vertex : this.auxVertices) {
                                                                                    vertex.setSlotSharingGroup(sharingGroup);
                                                                                    }


                                                                                    final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts =
                                                                                    JobGraphUtils.prepareUserArtifactEntries(
                                                                                    program.getOriginalPlan().getCachedFiles().stream()
                                                                                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
                                                                                    jobId);


                                                                                    // create the job graph object
                                                                                    final JobGraph graph;
                                                                                    try {
                                                                                    graph =
                                                                                    JobGraphBuilder.newBatchJobGraphBuilder()
                                                                                    .setJobId(jobId)
                                                                                    .setJobName(program.getJobName())
                                                                                    .setExecutionConfig(executionConfig)
                                                                                    .addJobVertices(vertices.values())
                                                                                    .addJobVertices(auxVertices)
                                                                                    .addUserArtifacts(userArtifacts)
                                                                                    .build();


                                                                                    if (executionConfig.getSchedulerType().isPresent()
                                                                                    && executionConfig.getSchedulerType().get()
                                                                                    == JobManagerOptions.SchedulerType.AdaptiveBatch) {
                                                                                    graph.setDynamic(true);
                                                                                    }


                                                                                    } catch (IOException e) {
                                                                                    throw new CompilerException(
                                                                                    "Could not serialize the ExecutionConfig."
                                                                                    + "This indicates that non-serializable types (like custom serializers) were registered");
                                                                                    }


                                                                                    // release all references again
                                                                                    this.vertices = null;
                                                                                    this.chainedTasks = null;
                                                                                    this.chainedTasksInSequence = null;
                                                                                    this.auxVertices = null;
                                                                                    this.iterations = null;
                                                                                    this.iterationStack = null;


                                                                                    // return job graph
                                                                                    return graph;
                                                                                    }
                                                                                    至此,JobGraph 生成完成。
                                                                                    回到 .execute(plan, configuration, userClassloader);方法
                                                                                    点击 .submitJob(jobGraph, userCodeClassloader);

                                                                                      public CompletableFuture<JobClient> submitJob(
                                                                                      JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
                                                                                      MiniClusterConfiguration miniClusterConfig =
                                                                                      getMiniClusterConfig(jobGraph.getMaximumParallelism());
                                                                                      MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
                                                                                      miniCluster.start();


                                                                                      return miniCluster
                                                                                      .submitJob(jobGraph)
                                                                                      .thenApplyAsync(
                                                                                      FunctionUtils.uncheckedFunction(
                                                                                      submissionResult -> {
                                                                                      org.apache.flink.client.ClientUtils
                                                                                      .waitUntilJobInitializationFinished(
                                                                                      () ->
                                                                                      miniCluster
                                                                                      .getJobStatus(
                                                                                      submissionResult
                                                                                      .getJobID())
                                                                                      .get(),
                                                                                      () ->
                                                                                      miniCluster
                                                                                      .requestJobResult(
                                                                                      submissionResult
                                                                                      .getJobID())
                                                                                      .get(),
                                                                                      userCodeClassloader);
                                                                                      return submissionResult;
                                                                                      }))
                                                                                      .thenApply(
                                                                                      result ->
                                                                                      new MiniClusterJobClient(
                                                                                      result.getJobID(),
                                                                                      miniCluster,
                                                                                      userCodeClassloader,
                                                                                      MiniClusterJobClient.JobFinalizationBehavior
                                                                                      .SHUTDOWN_CLUSTER))
                                                                                      .whenComplete(
                                                                                      (ignored, throwable) -> {
                                                                                      if (throwable != null) {
                                                                                      // We failed to create the JobClient and must shutdown to ensure
                                                                                      // cleanup.
                                                                                      shutDownCluster(miniCluster);
                                                                                      }
                                                                                      })
                                                                                      .thenApply(Function.identity());
                                                                                      }


                                                                                      任务被正式提交,并返回相应结果。
                                                                                      至此,Flink Table API 版的 WordCount 执行流程源码就分析完了。
                                                                                      总结如下:
                                                                                      第一步:创建环境
                                                                                      第二步:读取数据源
                                                                                      第三步:编写处理流程
                                                                                      第四步:封装Sinks
                                                                                      第五步:Sinks转 Plan
                                                                                      第六步:Plan转 Pipeline
                                                                                      第七步:Pipeline 转 Job Grape
                                                                                      第八步:提交 JobGrape
                                                                                      第九步:返回运行结果
                                                                                      往期精彩:
                                                                                      文章合集

                                                                                      文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                                      评论