暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

flink的local模式启动全流程源码分析

开发架构二三事 2020-05-17
319

主要针对一个WordCount的示例来对flink中关键组件如taskManager和jobMaster等服务的启动流程进行一个简要的分析。

示例代码

  1. object WordCount {


  2. def main(args: Array[String]) {


  3. val params: ParameterTool = ParameterTool.fromArgs(args)


  4. // set up execution environment

  5. val env = ExecutionEnvironment.getExecutionEnvironment


  6. // make parameters available in the web interface

  7. env.getConfig.setGlobalJobParameters(params)

  8. val text =

  9. if (params.has("input")) {

  10. env.readTextFile(params.get("input"))

  11. } else {

  12. println("Executing WordCount example with default input data set.")

  13. println("Use --input to specify file input.")

  14. env.fromCollection(WordCountData.WORDS)

  15. }

  16. // text.flatMap(_.toLowerCase.split("\\W+").filter(p => p.nonEmpty)).map(a => (a,1)).groupBy(0).sum(1)

  17. val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }

  18. .map { (_, 1) }

  19. .groupBy(0)

  20. .sum(1)


  21. if (params.has("output")) {

  22. counts.writeAsCsv(params.get("output"), "\n", " ")

  23. env.execute("Scala WordCount Example")

  24. } else {

  25. println("Printing result to stdout. Use --output to specify output path.")

  26. counts.print()

  27. }


  28. }

  29. }

这是一个执行WordCount的操作,我们以这个demo为入口来对整个执行流程进行分析记录。

生成JobGraph流程

这里就先流水式地记录一下,之后的文章中再详细地进行分析。

org.apache.flink.api.scala.DataSet#print:
  1. def print(): Unit = {

  2. javaSet.print()

  3. }

org.apache.flink.api.java.DataSet#print()
  1. public void print() throws Exception {

  2. List<T> elements = collect();

  3. for (T e: elements) {

  4. System.out.println(e);

  5. }

  6. }

org.apache.flink.api.java.DataSet#collect
  1. public List<T> collect() throws Exception {

  2. final String id = new AbstractID().toString();

  3. final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());


  4. this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");

  5. JobExecutionResult res = getExecutionEnvironment().execute();


  6. ArrayList<byte[]> accResult = res.getAccumulatorResult(id);

  7. if (accResult != null) {

执行和结果归集的过程。

org.apache.flink.api.java.ExecutionEnvironment#execute()
  1. public JobExecutionResult execute() throws Exception {

  2. return execute(getDefaultName());

  3. }

org.apache.flink.api.java.ExecutionEnvironment#execute(java.lang.String)
  1. public JobExecutionResult execute(String jobName) throws Exception {

  2. final JobClient jobClient = executeAsync(jobName);


  3. try {

  4. if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {

  5. lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();

  6. } else {

  7. lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());

  8. }

  9. -----------------------------------------------------

job执行的方法。

org.apache.flink.api.java.ExecutionEnvironment#executeAsync(java.lang.String)
  1. @PublicEvolving

  2. public JobClient executeAsync(String jobName) throws Exception {

  3. checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

  4. // StreamGraph与Plan都是Pipeline类型的

  5. // 生成执行计划

  6. final Plan plan = createProgramPlan(jobName);

  7. // pipeline执行工厂,

  8. final PipelineExecutorFactory executorFactory =

  9. executorServiceLoader.getExecutorFactory(configuration);


  10. checkNotNull(

  11. executorFactory,

  12. "Cannot find compatible factory for specified execution.target (=%s)",

  13. configuration.get(DeploymentOptions.TARGET));


  14. CompletableFuture<? extends JobClient> jobClientFuture = executorFactory

  15. .getExecutor(configuration)

  16. .execute(plan, configuration);


  17. try {

  18. JobClient jobClient = jobClientFuture.get();

  19. jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));

  20. return jobClient;

  21. --------------------

Pipeline有两个实现类:Plan和StreamGraph。当然在这个示例里生成的是Plan,在流式的情况下对应的就是StreamGraph类型。

继续往下看job的执行。

org.apache.flink.client.deployment.executors.LocalExecutor#execute
  1. @Override

  2. public CompletableFuture<? extends JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {

  3. checkNotNull(pipeline);

  4. checkNotNull(configuration);


  5. Configuration effectiveConfig = new Configuration();

  6. effectiveConfig.addAll(this.configuration);

  7. effectiveConfig.addAll(configuration);


  8. // we only support attached execution with the local executor.

  9. checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));


  10. final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);


  11. return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);

  12. }

这里的pipeline会对应Plan或者StreamGraph,在getJobGraph方法中会将对应的Pipeline转换成JobGraph,然后通过submitJob方法提交。

org.apache.flink.client.program.PerJobMiniClusterFactory#submitJob
  1. /**

  2. * Starts a {@link MiniCluster} and submits a job.

  3. */

  4. public CompletableFuture<? extends JobClient> submitJob(JobGraph jobGraph) throws Exception {

  5. MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());

  6. MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);

  7. miniCluster.start();


  8. return miniCluster

  9. .submitJob(jobGraph)

  10. .thenApply(result -> new PerJobMiniClusterJobClient(result.getJobID(), miniCluster))

  11. .whenComplete((ignored, throwable) -> {

  12. if (throwable != null) {

  13. // We failed to create the JobClient and must shutdown to ensure cleanup.

  14. shutDownCluster(miniCluster);

  15. }

  16. });

  17. }

在提交job时会创建一个MiniCluster,然后在miniCluster.start()中执行一些启动相关的操作。

org.apache.flink.runtime.minicluster.MiniCluster#start:

  1. public void start() throws Exception {

  2. synchronized (lock) {

  3. ----------------------

  4. miniClusterConfiguration.getConfiguration();

  5. final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;


  6. try {

  7. initializeIOFormatClasses(configuration);


  8. LOG.info("Starting Metrics Registry");

  9. metricRegistry = createMetricRegistry(configuration);


  10. // bring up all the RPC services

  11. LOG.info("Starting RPC Service(s)");


  12. final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory;


  13. if (useSingleRpcService) {

  14. // 如果是single模式

  15. // we always need the 'commonRpcService' for auxiliary calls

  16. commonRpcService = createLocalRpcService(configuration);

  17. final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);

  18. taskManagerRpcServiceFactory = commonRpcServiceFactory;

  19. dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;

  20. } else {

  21. // 其他模式

  22. // start a new service per component, possibly with custom bind addresses

  23. --------------------------

  24. // we always need the 'commonRpcService' for auxiliary calls

  25. // bind to the JobManager address with port 0

  26. commonRpcService = createRemoteRpcService(configuration, jobManagerBindAddress, 0);

  27. }


  28. //启动metric actor service

  29. RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(

  30. configuration,

  31. commonRpcService.getAddress());

  32. metricRegistry.startQueryService(metricQueryServiceRpcService, null);


  33. processMetricGroup = MetricUtils.instantiateProcessMetricGroup(

  34. metricRegistry,

  35. RpcUtils.getHostname(commonRpcService),

  36. ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));

  37. // 创建 io executor 这里使用的是fixed threadPool

  38. ioExecutor = Executors.newFixedThreadPool(

  39. Hardware.getNumberCPUCores(),

  40. new ExecutorThreadFactory("mini-cluster-io"));

  41. haServices = createHighAvailabilityServices(configuration, ioExecutor);

  42. // 创建blobServer并启动

  43. blobServer = new BlobServer(configuration, haServices.createBlobStore());

  44. blobServer.start();

  45. // 启动心跳service

  46. heartbeatServices = HeartbeatServices.fromConfiguration(configuration);


  47. blobCacheService = new BlobCacheService(

  48. configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())

  49. );

  50. // 启动taskManager

  51. startTaskManagers();


  52. MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());


  53. setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagreComponentRpcServiceFactory, metricQueryServiceRetriever);


  54. resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();

  55. dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();

  56. clusterRestEndpointLeaderRetrievalService = haServices.getClusterRestEndpointLeaderRetriever();

  57. // dispatcher网关的retriever

  58. dispatcherGatewayRetriever = new RpcGatewayRetriever<>(

  59. commonRpcService,

  60. DispatcherGateway.class,

  61. DispatcherId::fromUuid,

  62. 20,

  63. Time.milliseconds(20L));

  64. // 资源管理器网关retriever

  65. resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(

  66. commonRpcService,

  67. ResourceManagerGateway.class,

  68. ResourceManagerId::fromUuid,

  69. 20,

  70. Time.milliseconds(20L));

  71. // web monitor

  72. webMonitorLeaderRetriever = new LeaderRetriever();

  73. // 启动几个component

  74. resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);

  75. dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);

  76. clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);

  77. ----------------------------

这里会调用commonRpcService = createLocalRpcService(configuration)方法创建本地的RpcService,在这里会调用AkkaRpcServiceUtils中的remoteServiceBuilder方法构建本地的actor servcie:

  1. protected RpcService createRemoteRpcService(

  2. Configuration configuration,

  3. String bindAddress,

  4. int bindPort) throws Exception {

  5. return AkkaRpcServiceUtils.remoteServiceBuilder(configuration, bindAddress, String.valueOf(bindPort))

  6. .withBindAddress(bindAddress)

  7. .withBindPort(bindPort)

  8. .withCustomConfig(AkkaUtils.testDispatcherConfig())

  9. .createAndStart();

  10. }

org.apache.flink.runtime.minicluster.MiniCluster#startTaskManagers:

  1. @GuardedBy("lock")

  2. private void startTaskManagers() throws Exception {

  3. final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();


  4. LOG.info("Starting {} TaskManger(s)", numTaskManagers);


  5. for (int i = 0; i < numTaskManagers; i++) {

  6. startTaskExecutor();

  7. }

  8. }

org.apache.flink.runtime.minicluster.MiniCluster#startTaskExecutor:

  1. @VisibleForTesting

  2. void startTaskExecutor() throws Exception {

  3. synchronized (lock) {

  4. final Configuration configuration = miniClusterConfiguration.getConfiguration();


  5. final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(

  6. configuration,

  7. new ResourceID(UUID.randomUUID().toString()),

  8. taskManagerRpcServiceFactory.createRpcService(),

  9. haServices,

  10. heartbeatServices,

  11. metricRegistry,

  12. blobCacheService,

  13. useLocalCommunication(),

  14. taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));


  15. taskExecutor.start();

  16. taskManagers.add(taskExecutor);

  17. }

  18. }

org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager:

org.apache.flink.runtime.taskexecutor.TaskManagerServices#fromConfiguration:

  1. /**

  2. * Creates and returns the task manager services.

  3. *

  4. * @param taskManagerServicesConfiguration task manager configuration

  5. * @param taskManagerMetricGroup metric group of the task manager

  6. * @param taskIOExecutor executor for async IO operations

  7. * @return task manager components

  8. * @throws Exception

  9. */

  10. public static TaskManagerServices fromConfiguration(

  11. TaskManagerServicesConfiguration taskManagerServicesConfiguration,

  12. MetricGroup taskManagerMetricGroup,

  13. Executor taskIOExecutor) throws Exception {


  14. // pre-start checks

  15. checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

  16. // 创建 dispatcher

  17. final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();


  18. // start the I/O manager, it will create some temp directories.

  19. final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());


  20. final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(

  21. taskManagerServicesConfiguration,

  22. taskEventDispatcher,

  23. taskManagerMetricGroup);

  24. final int listeningDataPort = shuffleEnvironment.start();

  25. // 创建并启动KvStateService

  26. final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);

  27. kvStateService.start();


  28. final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation(

  29. taskManagerServicesConfiguration.getResourceID(),

  30. taskManagerServicesConfiguration.getExternalAddress(),

  31. // we expose the task manager location with the listening port

  32. // iff the external data port is not explicitly defined

  33. taskManagerServicesConfiguration.getExternalDataPort() > 0 ?

  34. taskManagerServicesConfiguration.getExternalDataPort() :

  35. listeningDataPort);

  36. // 广播变量管理

  37. final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

  38. // slot table

  39. final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(

  40. taskManagerServicesConfiguration.getNumberOfSlots(),

  41. taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),

  42. taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),

  43. taskManagerServicesConfiguration.getPageSize());

  44. // job manager table

  45. final JobManagerTable jobManagerTable = new JobManagerTable();

  46. // JobLeaderService

  47. final JobLeaderService jobLeaderService = new JobLeaderService(unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());


  48. final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();


  49. final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];


  50. for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {

  51. stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);

  52. }

  53. // 本地状态管理器

  54. final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(

  55. taskManagerServicesConfiguration.isLocalRecoveryEnabled(),

  56. stateRootDirectoryFiles,

  57. taskIOExecutor);

  58. // io执行器

  59. final ExecutorService ioExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("taskexecutor-io"));

  60. // 几个taskManagerService的集合

  61. return new TaskManagerServices(

  62. unresolvedTaskManagerLocation,

  63. taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),

  64. ioManager,

  65. shuffleEnvironment,

  66. kvStateService,

  67. broadcastVariableManager,

  68. taskSlotTable,

  69. jobManagerTable,

  70. jobLeaderService,

  71. taskStateManager,

  72. taskEventDispatcher,

  73. ioExecutor);

  74. }

这里会创建并启动一系列的taskManager内部的service,这其中也包括org.apache.flink.runtime.taskexecutor.JobLeaderService。

到了这里,taskManager的初始化部分以及启动部分都已经完成,我们再回过头来看org.apache.flink.runtime.minicluster.MiniCluster#submitJob方法:

  1. public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {

  2. final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();

  3. final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);

  4. final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);

  5. final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture

  6. .thenCombine(

  7. dispatcherGatewayFuture,

  8. (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))

  9. .thenCompose(Function.identity());

  10. return acknowledgeCompletableFuture.thenApply(

  11. (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));

  12. }

org.apache.flink.runtime.dispatcher.Dispatcher#submitJob:

  1. @Override

  2. public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {

  3. log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());


  4. try {

  5. if (isDuplicateJob(jobGraph.getJobID())) {

  6. return FutureUtils.completedExceptionally(

  7. new DuplicateJobSubmissionException(jobGraph.getJobID()));

  8. } else if (isPartialResourceConfigured(jobGraph)) {

  9. return FutureUtils.completedExceptionally(

  10. new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +

  11. "resources configured. The limitation will be removed in future versions."));

  12. } else {

  13. return internalSubmitJob(jobGraph);

  14. }

  15. } catch (FlinkException e) {

  16. return FutureUtils.completedExceptionally(e);

  17. }

  18. }

org.apache.flink.runtime.dispatcher.Dispatcher#internalSubmitJob:

  1. private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {

  2. log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());


  3. final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)

  4. .thenApply(ignored -> Acknowledge.get());


  5. return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {

  6. if (throwable != null) {

  7. cleanUpJobData(jobGraph.getJobID(), true);

  8. -------------------------

进入org.apache.flink.runtime.dispatcher.Dispatcher#persistAndRunJob方法:

  1. private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {

  2. jobGraphWriter.putJobGraph(jobGraph);


  3. final CompletableFuture<Void> runJobFuture = runJob(jobGraph);


  4. return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {

  5. if (throwable != null) {

  6. jobGraphWriter.removeJobGraph(jobGraph.getJobID());

  7. }

  8. }));

  9. }

org.apache.flink.runtime.dispatcher.Dispatcher#runJob:

  1. private CompletableFuture<Void> runJob(JobGraph jobGraph) {

  2. Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));


  3. final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);


  4. jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);


  5. return jobManagerRunnerFuture

  6. .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))

  7. .thenApply(FunctionUtils.nullFn())

  8. .whenCompleteAsync(

  9. (ignored, throwable) -> {

  10. if (throwable != null) {

  11. jobManagerRunnerFutures.remove(jobGraph.getJobID());

  12. }

  13. },

  14. getMainThreadExecutor());

  15. }

org.apache.flink.runtime.dispatcher.Dispatcher#createJobManagerRunner:

  1. private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {

  2. final RpcService rpcService = getRpcService();


  3. return CompletableFuture.supplyAsync(

  4. CheckedSupplier.unchecked(() ->

  5. jobManagerRunnerFactory.createJobManagerRunner(

  6. jobGraph,

  7. configuration,

  8. rpcService,

  9. highAvailabilityServices,

  10. heartbeatServices,

  11. jobManagerSharedServices,

  12. new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),

  13. fatalErrorHandler)),

  14. rpcService.getExecutor());

  15. }

通过org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory#createJobManagerRunner创建JobManagerRunnerImpl实例:

  1. public JobManagerRunnerImpl(

  2. final JobGraph jobGraph,

  3. final JobMasterServiceFactory jobMasterFactory,

  4. final HighAvailabilityServices haServices,

  5. final LibraryCacheManager libraryCacheManager,

  6. final Executor executor,

  7. final FatalErrorHandler fatalErrorHandler) throws Exception {


  8. this.resultFuture = new CompletableFuture<>();

  9. this.terminationFuture = new CompletableFuture<>();

  10. this.leadershipOperation = CompletableFuture.completedFuture(null);


  11. // make sure we cleanly shut down out JobManager services if initialization fails

  12. try {

  13. this.jobGraph = checkNotNull(jobGraph);

  14. this.libraryCacheManager = checkNotNull(libraryCacheManager);

  15. this.executor = checkNotNull(executor);

  16. this.fatalErrorHandler = checkNotNull(fatalErrorHandler);


  17. checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");


  18. // libraries and class loader first

  19. try {

  20. libraryCacheManager.registerJob(

  21. jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());

  22. } catch (IOException e) {

  23. throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);

  24. }


  25. final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());

  26. if (userCodeLoader == null) {

  27. throw new Exception("The user code class loader could not be initialized.");

  28. }


  29. // high availability services next

  30. this.runningJobsRegistry = haServices.getRunningJobsRegistry();

  31. this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());


  32. this.leaderGatewayFuture = new CompletableFuture<>();

  33. // org.apache.flink.runtime.jobmaster.JobMasterService这个和taskManager中的jobMasterService是不同的

  34. // now start the JobManager

  35. this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);

  36. }

  37. ----------------------

jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader):

  1. @Override

  2. public JobMaster createJobMasterService(

  3. JobGraph jobGraph,

  4. OnCompletionActions jobCompletionActions,

  5. ClassLoader userCodeClassloader) throws Exception {


  6. return new JobMaster(

  7. rpcService,

  8. jobMasterConfiguration,

  9. ResourceID.generate(),

  10. jobGraph,

  11. haServices,

  12. slotPoolFactory,

  13. schedulerFactory,

  14. jobManagerSharedServices,

  15. heartbeatServices,

  16. jobManagerJobMetricGroupFactory,

  17. jobCompletionActions,

  18. fatalErrorHandler,

  19. userCodeClassloader,

  20. schedulerNGFactory,

  21. shuffleMaster,

  22. lookup -> new JobMasterPartitionTrackerImpl(

  23. jobGraph.getJobID(),

  24. shuffleMaster,

  25. lookup

  26. ));

  27. }

这里创建了JobMaster。

org.apache.flink.runtime.dispatcher.Dispatcher#startJobManagerRunner:

  1. private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {

  2. final JobID jobId = jobManagerRunner.getJobID();


  3. FutureUtils.assertNoException(

  4. jobManagerRunner.getResultFuture().handleAsync(

  5. (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {

  6. // check if we are still the active JobManagerRunner by checking the identity

  7. final JobManagerRunner currentJobManagerRunner = Optional.ofNullable(jobManagerRunnerFutures.get(jobId))

  8. .map(future -> future.getNow(null))

  9. .orElse(null);

  10. //noinspection ObjectEquality

  11. ------------------------

  12. }, getMainThreadExecutor()));


  13. jobManagerRunner.start();


  14. return jobManagerRunner;

  15. }

在这里启动jobMaster。

在Local client、taskManager、jobMaster中都会启动对应的一个actor service,然后相互之间会进行通信,如心跳和任务分配等。


文章转载自开发架构二三事,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论