前面我们已经知道了如何去修改编译flink源码
Streaming API 版 WordCount运行流程源码解析
万字长文 | 透过现象看本质 | 从WordCount 到Flink Streaming API 源码详细解读
今天我们接着来研究 Table Api 版 WordCount是如何运行的
我们先看代码:
public class WordCountWzy {private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);public static void main(String[] args) throws Exception {LOGGER.warn(DATASET_DEPRECATION_INFO);final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);// 设置执行环境final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> text = null;text = WordCountData.getDefaultTextLineDataSet(env);DataSet<Tuple2<String, Integer>> counts =// 将行分成二元组,包含:(word,1)text.flatMap(new WordCount.Tokenizer())// 按元组字段“0”分组,并对元组字段“1”求和.groupBy(0).sum(1);counts.print();}/*** 实现了string分词器,将句子拆分成单词,作为用户定义的FlatMapFunction。* 该函数接受一行(字符串),并将其拆分为“(word,1)”形式的多个对。({@code Tuple2<String, Integer>})。*/public static final class Tokenizerimplements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {String[] tokens = value.toLowerCase().split("\\W+");for (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<>(token, 1));}}}}}public class WordCountData {public static final String[] WORDS =new String[] {"To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."};public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {return env.fromElements(WORDS);}}
01
设置运行环境
// 设置执行环境final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
点击getExecutionEnvironment()
/*** 创建一个执行环境,该环境代表了程序当前执行的上下文。如果程序是独立调用的,此方法返回一个本地执行环境,* 如 {@link #createLocalEnvironment()} 返回的那样。如果程序是从命令行客户端调用并提交到集群中,此方法返回该集群的执行环境。** @return 程序执行上下文的执行环境。*/public static ExecutionEnvironment getExecutionEnvironment() {return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory).map(ExecutionEnvironmentFactory::createExecutionEnvironment).orElseGet(ExecutionEnvironment::createLocalEnvironment);}
点击 createLocalEnvironment
/*** 创建一个{@link LocalEnvironment}。本地执行环境将在创建环境的同一JVM中以多线程方式运行程序。* 本地环境的默认并行度是硬件上下文的数量(CPU核心/线程),除非通过{@link #setDefaultLocalParallelism(int)}指定了不同的值。** @return A local execution environment.*/public static LocalEnvironment createLocalEnvironment() {return createLocalEnvironment(defaultLocalDop);}
点击 createLocalEnvironment(defaultLocalDop);
/*** 创建一个{@link LocalEnvironment}。本地执行环境将在创建环境的同一JVM中以多线程方式运行程序。* 它将使用参数中指定的并行性。** @param parallelism 本地环境的并行度。* @return 具有指定并行度的本地执行环境。*/public static LocalEnvironment createLocalEnvironment(int parallelism) {return createLocalEnvironment(new Configuration(), parallelism);}
点击 createLocalEnvironment(new Configuration(), parallelism);方法
/*** 创建一个用于执行 Flink 作业的{@link LocalEnvironment}。** @param configuration 启动带有 {@link LocalEnvironment}的* @param defaultParallelism 用{@link LocalEnvironment}来初始化* @return {@link LocalEnvironment}*/private static LocalEnvironment createLocalEnvironment(Configuration configuration, int defaultParallelism) {final LocalEnvironment localEnvironment = new LocalEnvironment(configuration);if (defaultParallelism > 0) {localEnvironment.setParallelism(defaultParallelism);}return localEnvironment;}
点击 new LocalEnvironment(configuration);
/*** 创建一个新的本地环境,该环境使用给定的配置来配置其本地执行器。** @param config 用于配置本地执行器的配置。*/public LocalEnvironment(Configuration config) {super(validateAndGetConfiguration(config));}
点击 super(validateAndGetConfiguration(config));
/*** 创建一个新的{@link ExecutionEnvironment},它将使用给定的{@link Configuration}来配置{@link PipelineExecutor}。*/@PublicEvolvingpublic ExecutionEnvironment(final Configuration configuration) {this(configuration, null);}
点击 this(configuration, null);
/*** 创建一个新的{@link ExecutionEnvironment},它将使用给定的{@link Configuration}来配置{@link PipelineExecutor}。** <p>此外,此构造函数允许指定用户代码的{@link ClassLoader}。*/@PublicEvolvingpublic ExecutionEnvironment(final Configuration configuration, final ClassLoader userClassloader) {this(new DefaultExecutorServiceLoader(), configuration, userClassloader);}
点击 this(new DefaultExecutorServiceLoader(), configuration, userClassloader);
*** 创建一个新的{@link ExecutionEnvironment},将使用给定的{@link Configuration}来配置{@link PipelineExecutor}。** <p>此外,此构造函数还允许指定{@link PipelineExecutorServiceLoader}和用户代码的{@link ClassLoader}。*/@PublicEvolvingpublic ExecutionEnvironment(final PipelineExecutorServiceLoader executorServiceLoader,final Configuration configuration,final ClassLoader userClassloader) {this.executorServiceLoader = checkNotNull(executorServiceLoader);this.configuration = new Configuration(checkNotNull(configuration));this.userClassloader =userClassloader == null ? getClass().getClassLoader() : userClassloader;一个作业或操作符的配置可以在以下地方指定:i) 在操作符级别,例如使用SingleOutputStreamOperator.setParallelism()设置并行度。ii) 通过编程方式,例如使用env.setRestartStrategy()方法iii) 在这里传递的配置中如果在多个地方指定,则优先级顺序就是上述的顺序。鉴于此,可以在此处覆盖执行配置的默认值,因为所有其他方式都假设环境已经实例化,因此它们将覆盖在此处传递的值。this.configure(this.configuration, this.userClassloader);}
02
读取数据源
return env.fromElements(WORDS);
点击 env.fromElements(WORDS);
@SafeVarargspublic final <X> DataSource<X> fromElements(X... data) {if (data == null) {throw new IllegalArgumentException("The data must not be null.");}if (data.length == 0) {throw new IllegalArgumentException("The number of elements must not be zero.");}TypeInformation<X> typeInfo;try {typeInfo = TypeExtractor.getForObject(data[0]);} catch (Exception e) {throw new RuntimeException("Could not create TypeInformation for type "+ data[0].getClass().getName()+ "; please specify the TypeInformation manually via "+ "ExecutionEnvironment#fromElements(Collection, TypeInformation)",e);}return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());}
点击 fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName()); 方法
private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) {CollectionInputFormat.checkCollection(data, type.getTypeClass());return new DataSource<>(this,new CollectionInputFormat<>(data, type.createSerializer(config)),type,callLocationName);}
点击 new DataSource
/*** 创建一个新的数据源。** @param context 数据源执行的环境。* @param inputFormat 数据源执行的输入格式。* @param type 这种输入格式产生的元素类型。*/public DataSource(ExecutionEnvironment context,InputFormat<OUT, ?> inputFormat,TypeInformation<OUT> type,String dataSourceLocationName) {super(context, type);this.dataSourceLocationName = dataSourceLocationName;if (inputFormat == null) {throw new IllegalArgumentException("The input format may not be null.");}this.inputFormat = inputFormat;if (inputFormat instanceof NonParallelInput) {this.parallelism = 1;}}
点击new CollectionInputFormat<>(data, type.createSerializer(config)),
public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer) {if (dataSet == null) {throw new NullPointerException();}this.serializer = serializer;this.dataSet = dataSet;}
03
数据处理逻辑
DataSet<Tuple2<String, Integer>> counts =// 将行分成二元组,包含:(word,1)text.flatMap(new WordCount.Tokenizer())// 按元组字段“0”分组,并对元组字段“1”求和.groupBy(0).sum(1);
04
执行任务
counts.print();
点击print();
/*** 将DataSet中的元素打印到调用print()方法的JVM的标准输出流{@link System#out}。* 对于在集群中执行的程序,此方法需要将DataSet的内容收集回客户端,然后在那里打印它。** <p>每个元素写入的字符串由{@link Object#toString()}方法定义。** <p>此方法会立即触发程序执行,类似于{@link #collect()}和{@link #count()}方法。** @see #printToErr()* @see #printOnTaskManager(String)*/public void print() throws Exception {List<T> elements = collect();for (T e : elements) {System.out.println(e);}}
这是一个公共的print()方法,它可能会抛出异常。在这个方法中,首先调用collect()方法收集元素并将这些元素存储在一个名为elements的列表中。然后,使用for-each循环遍历这个列表,将每个元素e打印到控制台。
点击 collect();
/*** 这是一个便利的方法,用于将DataSet的元素获取为一个列表。由于DataSet可能包含大量的数据,因此应谨慎使用此方法。** @return 一个包含DataSet元素的列表。*/public List<T> collect() throws Exception {final String id = new AbstractID().toString();final TypeSerializer<T> serializer =getType().createSerializer(getExecutionEnvironment().getConfig());this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");JobExecutionResult res = getExecutionEnvironment().execute();ArrayList<byte[]> accResult = res.getAccumulatorResult(id);if (accResult != null) {try {return SerializedListAccumulator.deserializeList(accResult, serializer);} catch (ClassNotFoundException e) {throw new RuntimeException("Cannot find type class of collected data type.", e);} catch (IOException e) {throw new RuntimeException("Serialization error while deserializing collected data", e);}} else {throw new RuntimeException("The call to collect() could not retrieve the DataSet.");}}
点击 output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
/*** 使用{@link OutputFormat}发出一个DataSet。这个方法为程序添加了一个 data sink.*程序可以有多个 data sinks. 可以同时有多个消费者 (data sinks or transformations).** @param outputFormat 处理DataSet的OutputFormat。* @return 处理DataSet的DataSink。* @see OutputFormat* @see DataSink*/public DataSink<T> output(OutputFormat<T> outputFormat) {Preconditions.checkNotNull(outputFormat);configure the type if neededif (outputFormat instanceof InputTypeConfigurable) {((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig());}DataSink<T> sink = new DataSink<>(this, outputFormat, getType());this.context.registerDataSink(sink);return sink;}
点击 this.context.registerDataSink(sink);
点击execute();
/*** 触发程序执行。环境将执行所有导致"sink"操作的程序部分。例如,sink操作包括打印结果({@link DataSet#print()},* 写入结果(例如{@link DataSet#writeAsText(String)},{@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}* 或者使用{@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}创建的其他通用数据sink。** <p>程序执行将被记录并显示,显示的名称将使用生成的默认名称。** @return 作业执行的结果,包含经过的时间和累加器* @throws Exception Thrown, if the program executions fails.*/public JobExecutionResult execute() throws Exception {return execute(getJobName());}
点击 execute(getJobName());
/*** 触发程序执行。环境将执行程序中所有导致“sink”操作的部分。例如,sink 操作包括打印结果({@link DataSet#print()}),* 写入结果(例如,{@link DataSet#writeAsText(String)},{@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)},* 或者使用 {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)} 创建的其他通用数据 sink。** <p>程序执行将被记录并以给定的作业名称显示。** @return The result of the job execution, containing elapsed time and accumulators.* @throws Exception Thrown, if the program executions fails.*/public JobExecutionResult execute(String jobName) throws Exception {final JobClient jobClient = executeAsync(jobName);try {if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {lastJobExecutionResult = jobClient.getJobExecutionResult().get();} else {lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());}jobListeners.forEach(jobListener -> jobListener.onJobExecuted(lastJobExecutionResult, null));} catch (Throwable t) {Throwable strippedException = ExceptionUtils.stripExecutionException(t);jobListeners.forEach(jobListener -> {jobListener.onJobExecuted(null, strippedException);});ExceptionUtils.rethrowException(strippedException);}return lastJobExecutionResult;}
这是一个名为 execute 的公共方法,它接收一个 jobName 参数并抛出异常。首先,它通过调用 executeAsync(jobName) 创建一个 JobClient。
点击 executeAsync(jobName)
/*** 这个方法异步触发程序执行。环境将执行程序中所有导致“sink”操作的部分。“sink”操作例如打印结果({@link DataSet#print()}),* 写入结果(例如{@link DataSet#writeAsText(String)},{@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)})* 或者使用{@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}创建的其他通用数据sink。** <p>程序执行将会被记录并显示出给定的作业名称。** @return 一个{@link JobClient},在提交成功后,可以用来与提交的作业进行通信。* @throws Exception Thrown, if the program submission fails.*/@PublicEvolvingpublic JobClient executeAsync(String jobName) throws Exception {checkNotNull(configuration.get(DeploymentOptions.TARGET),"No execution.target specified in your configuration file.");final Plan plan = createProgramPlan(jobName);final PipelineExecutorFactory executorFactory =executorServiceLoader.getExecutorFactory(configuration);checkNotNull(executorFactory,"Cannot find compatible factory for specified execution.target (=%s)",configuration.get(DeploymentOptions.TARGET));CompletableFuture<JobClient> jobClientFuture =executorFactory.getExecutor(configuration).execute(plan, configuration, userClassloader);try {JobClient jobClient = jobClientFuture.get();jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));return jobClient;} catch (Throwable t) {jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));ExceptionUtils.rethrow(t);// make javac happy, this code path will not be reachedreturn null;}}
/*** 创建程序的{@link Plan}。计划是所有数据源、数据接收器和操作以及它们如何交互的描述,作为一个可以使用{@link PipelineExecutor}执行的独立单元。* 获取一个计划并使用执行器启动它是运行程序的另一种方式,只有当程序只包含分布式操作时才可能。这将自动启动一个新的执行阶段。** @param jobName 附加到计划的名称(在日志和监控中显示)。* @return The program's plan.*/@Internalpublic Plan createProgramPlan(String jobName) {return createProgramPlan(jobName, true);}
/*** 创建程序的{@link Plan}。计划是所有数据源,数据接收器,操作以及它们如何交互的描述,作为可以用{@link PipelineExecutor}执行的独立单元。* 获取计划并使用执行器启动它是运行程序的另一种方式,只有在程序仅由分布式操作组成时才可能。** @param jobName 附加到计划的名称(在日志和监控中显示)。* @param clearSinks 是否开始新的执行阶段。* @return The program's plan.*/@Internalpublic Plan createProgramPlan(String jobName, boolean clearSinks) {checkNotNull(jobName);if (this.sinks.isEmpty()) {if (wasExecuted) {throw new RuntimeException("No new data sinks have been defined since the "+ "last execution. The last execution refers to the latest call to "+ "'execute()', 'count()', 'collect()', or 'print()'.");} else {throw new RuntimeException("No data sinks have been created yet. "+ "A program needs at least one sink that consumes data. "+ "Examples are writing the data set or printing it.");}}final PlanGenerator generator =new PlanGenerator(sinks, config, getParallelism(), cacheFile, jobName);final Plan plan = generator.generate();// clear all the sinks such that the next execution does not redo everythingif (clearSinks) {this.sinks.clear();wasExecuted = true;}return plan;}
public Plan generate() {final Plan plan = createPlan();registerGenericTypeInfoIfConfigured(plan);registerCachedFiles(plan);logTypeRegistrationDetails();return plan;}
点击 createPlan();
private Plan createPlan() {final OperatorTranslation translator = new OperatorTranslation();final Plan plan = translator.translateToPlan(sinks, jobName);if (defaultParallelism > 0) {plan.setDefaultParallelism(defaultParallelism);}plan.setExecutionConfig(config);return plan;}
public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();for (DataSink<?> sink : sinks) {planSinks.add(translate(sink));}Plan p = new Plan(planSinks);p.setJobName(jobName);return p;}
/*** 创建一个新的程序计划,描述以给定数据接收器结束的数据流。作业的显示名称使用时间戳生成。** <p>如果数据流的所有接收器都没有给出,那么可能不会完全翻译整个流程,而只翻译从给定的数据接收器反向遍历可达的流程的部分。** @param sinks 这个集合将成为数据流的接收器。*/public Plan(Collection<? extends GenericDataSinkBase<?>> sinks) {this(sinks, ExecutionConfig.PARALLELISM_DEFAULT);}
/*** 创建一个具有给定名称和默认并行性的新程序计划,描述以给定数据接收器结束的数据流。** <p>如果数据流的所有接收器都没有给到计划,那么整个流可能无法完全被转换。** @param sinks 这个集合将会是工作的数据流的接收器。* @param jobName 显示的工作名称。* @param defaultParallelism 该任务的默认并行性。*/public Plan(Collection<? extends GenericDataSinkBase<?>> sinks,String jobName,int defaultParallelism) {this.sinks.addAll(sinks);this.jobName = jobName;this.defaultParallelism = defaultParallelism;}
CompletableFuture jobClientFuture =executorFactory.getExecutor(configuration).execute(plan, configuration, userClassloader);
public interface PipelineExecutor {/*** 根据提供的配置执行一个{@link Pipeline},并返回一个{@link JobClient},它允许与正在执行的作业进行交互,例如取消它或获取一个保存点。** 调用者负责管理返回值的生命周期* {@link JobClient}。这意味着,例如{@code close()}应该在调用处显式调用。** @param pipeline 要执行的{@link管道}* @param configuration the {@link配置}和必需的执行参数* @param userCodeClassloader 用来反序列化usercode的{@link类加载器}* @return a {@link CompletableFuture} {@link JobClient}对应管道。*/CompletableFuture<JobClient> execute(final Pipeline pipeline,final Configuration configuration,final ClassLoader userCodeClassloader)throws Exception;}
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)throws Exception {checkNotNull(pipeline);checkNotNull(configuration);Configuration effectiveConfig = new Configuration();effectiveConfig.addAll(this.configuration);effectiveConfig.addAll(configuration);// we only support attached execution with the local executor.checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);}
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);
private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)throws MalformedURLException {// This is a quirk in how LocalEnvironment used to work. It sets the default parallelism// to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour// for now.if (pipeline instanceof Plan) {Plan plan = (Plan) pipeline;final int slotsPerTaskManager =configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());final int numTaskManagers =configuration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);}return PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);}
/*** 创建对应于提供的 {@link Pipeline} 的 {@link JobGraph}。** @param pipeline 我们正在计算其作业图的管道。* @param configuration 具有必要信息的配置,例如要包含的jar和类路径,作业的并行性以及可能用于引导其状态的保存点设置。* @param userClassloader the classloader which can load user classes.* @return the corresponding {@link JobGraph}.*/public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline,@Nonnull final Configuration configuration,@Nonnull ClassLoader userClassloader)throws MalformedURLException {checkNotNull(pipeline);checkNotNull(configuration);final ExecutionConfigAccessor executionConfigAccessor =ExecutionConfigAccessor.fromConfiguration(configuration);final JobGraph jobGraph =FlinkPipelineTranslationUtil.getJobGraph(userClassloader,pipeline,configuration,executionConfigAccessor.getParallelism());configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID).ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));if (configuration.getBoolean(DeploymentOptions.ATTACHED)&& configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {jobGraph.setInitialClientHeartbeatTimeout(configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));}jobGraph.addJars(executionConfigAccessor.getJars());jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());return jobGraph;}
.getJobGraph(userClassloader,pipeline,configuration,executionConfigAccessor.getParallelism());
public static JobGraph getJobGraph(ClassLoader userClassloader,Pipeline pipeline,Configuration optimizerConfiguration,int defaultParallelism) {FlinkPipelineTranslator pipelineTranslator =getPipelineTranslator(userClassloader, pipeline);return pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism);}
public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {checkArgument(pipeline instanceof Plan, "Given pipeline is not a DataSet Plan.");Plan plan = (Plan) pipeline;setDefaultParallelism(plan, defaultParallelism);return compilePlan(plan, optimizerConfiguration);}
private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);OptimizedPlan optimizedPlan = optimizer.compile(plan);JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());}
点击 compileJobGraph(optimizedPlan, plan.getJobId());
public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {if (program == null) {throw new NullPointerException("Program is null, did you called " + "ExecutionEnvironment.execute()");}if (jobId == null) {jobId = JobID.generate();}this.vertices = new HashMap<PlanNode, JobVertex>();this.chainedTasks = new HashMap<PlanNode, TaskInChain>();this.chainedTasksInSequence = new ArrayList<TaskInChain>();this.auxVertices = new ArrayList<JobVertex>();this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();this.iterationStack = new ArrayList<IterationPlanNode>();this.sharingGroup = new SlotSharingGroup();ExecutionConfig executionConfig = program.getOriginalPlan().getExecutionConfig();// this starts the traversal that generates the job graphprogram.accept(this);// sanity check that we are not somehow in an iteration at the endif (this.currentIteration != null) {throw new CompilerException("The graph translation ended prematurely, leaving an unclosed iteration.");}// finalize the iterationsfor (IterationDescriptor iteration : this.iterations.values()) {if (iteration.getIterationNode() instanceof BulkIterationPlanNode) {finalizeBulkIteration(iteration);} else if (iteration.getIterationNode() instanceof WorksetIterationPlanNode) {finalizeWorksetIteration(iteration);} else {throw new CompilerException();}}// now that the traversal is done, we have the chained tasks write their configs into their// parents' configurationsfor (TaskInChain tic : this.chainedTasksInSequence) {TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration());t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());}// ----- attach the additional info to the job vertices, for display in the runtime monitorattachOperatorNamesAndDescriptions();// ----------- finalize the job graph -----------for (JobVertex vertex : this.auxVertices) {vertex.setSlotSharingGroup(sharingGroup);}final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts =JobGraphUtils.prepareUserArtifactEntries(program.getOriginalPlan().getCachedFiles().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),jobId);// create the job graph objectfinal JobGraph graph;try {graph =JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobId).setJobName(program.getJobName()).setExecutionConfig(executionConfig).addJobVertices(vertices.values()).addJobVertices(auxVertices).addUserArtifacts(userArtifacts).build();if (executionConfig.getSchedulerType().isPresent()&& executionConfig.getSchedulerType().get()== JobManagerOptions.SchedulerType.AdaptiveBatch) {graph.setDynamic(true);}} catch (IOException e) {throw new CompilerException("Could not serialize the ExecutionConfig."+ "This indicates that non-serializable types (like custom serializers) were registered");}// release all references againthis.vertices = null;this.chainedTasks = null;this.chainedTasksInSequence = null;this.auxVertices = null;this.iterations = null;this.iterationStack = null;// return job graphreturn graph;}
public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {MiniClusterConfiguration miniClusterConfig =getMiniClusterConfig(jobGraph.getMaximumParallelism());MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);miniCluster.start();return miniCluster.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(() ->miniCluster.getJobStatus(submissionResult.getJobID()).get(),() ->miniCluster.requestJobResult(submissionResult.getJobID()).get(),userCodeClassloader);return submissionResult;})).thenApply(result ->new MiniClusterJobClient(result.getJobID(),miniCluster,userCodeClassloader,MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER)).whenComplete((ignored, throwable) -> {if (throwable != null) {// We failed to create the JobClient and must shutdown to ensure// cleanup.shutDownCluster(miniCluster);}}).thenApply(Function.identity());}




