暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink源码解析5-Dispatcher的创建(45)

beenrun 2023-02-19
559

Flink中RPC通信总结

(1)RpcGateway:路由,RPC的根节点,各种RPC组件都是RpcGateway的子类
(2)RpcServer:RpcService和RpcEndpoint之间的粘合层
(3)RpcEndpoint:业务逻辑的载体,对应Actor的封装,当创建一个Endpoint的时候,就要想到会调用onStart()方法。可以理解成初始化。
(4)RpcService:对应ActorSystem的封装

Dispatcher的整个创建过程


入口dispatcherRunner创建

DefaultDispatcherResourceManagerComponentFactory类中的create方法

    //创建DispatcherRunner,内部创建和启动Dispatch
    dispatcherRunner =
    dispatcherRunnerFactory.createDispatcherRunner(
    highAvailabilityServices.getDispatcherLeaderElectionService(),
    fatalErrorHandler,
    new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
    ioExecutor,
    rpcService,
    partialDispatcherServices);


    dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);



    创建DispatcherRunner

      @Override
      public DispatcherRunner createDispatcherRunner(
      LeaderElectionService leaderElectionService,
      FatalErrorHandler fatalErrorHandler,
      JobPersistenceComponentFactory jobPersistenceComponentFactory,
      Executor ioExecutor,
      RpcService rpcService,
      PartialDispatcherServices partialDispatcherServices)
      throws Exception {


      final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
      dispatcherLeaderProcessFactoryFactory.createFactory(
      jobPersistenceComponentFactory,
      ioExecutor,
      rpcService,
      partialDispatcherServices,
      fatalErrorHandler);


      return DefaultDispatcherRunner.create(
      leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
      }

      直接用new创建DefaultDispatcherRunner

      创建了DefaultDispatcherRunner对象,开启DispatchRunner声明周期,
      leaderElectionService的作用是帮助角色dispatcherRunner完成选举

        public static DispatcherRunner create(
        LeaderElectionService leaderElectionService,
        FatalErrorHandler fatalErrorHandler,
        DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)
        throws Exception {
        final DefaultDispatcherRunner dispatcherRunner =
        new DefaultDispatcherRunner(
        leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
        //leaderElectionService帮助角色完成选举
        return DispatcherRunnerLeaderElectionLifecycleManager.createFor(
        dispatcherRunner, leaderElectionService);
        }

        静态方法创建DispatcherRunner

        leaderElectionService是帮助dispatcherRunner进行选举

          public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(
          T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
          return new DispatcherRunnerLeaderElectionLifecycleManager<>(
          dispatcherRunner, leaderElectionService);
          }

          启动选举dispatcherRunner

          调用了leaderElectionService将参数dispatcherRunner传入

            private DispatcherRunnerLeaderElectionLifecycleManager(
            T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
            this.dispatcherRunner = dispatcherRunner;
            this.leaderElectionService = leaderElectionService;
            /**
            * 启动选举,参数是dispatcherRunner:DispatcherRunner
            * 选举服务对象内部的leaderContender 是DefaultDisPatcherRunner
            */
            leaderElectionService.start(dispatcherRunner);
            }

            StandaloneLeaderElectionService类中的start方法进行实现

            看到代码大家发现,这里是咱们前面已经讲过的选举过程,在选举ResourceManager和WebMonitorEndpoint组件的时候,也是使用该方法。
            在这里是DispatchRunner的竞选
            这里的contender参数实际是DispatchRunner对象。

              @Override
              public void start(LeaderContender newContender) throws Exception {
              if (contender != null) {
              // Service was already started
              throw new IllegalArgumentException(
              "Leader election service cannot be started multiple times.");
              }


              contender = Preconditions.checkNotNull(newContender);


              // directly grant leadership to the given contender
              contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
              }

              DefaultDispatcherRunner#grantLeadership

              @Override

                public void grantLeadership(UUID leaderSessionID) {
                runActionIfRunning(
                () -> {
                LOG.info(
                "{} was granted leadership with leader id {}. Creating new {}.",
                getClass().getSimpleName(),
                leaderSessionID,
                DispatcherLeaderProcess.class.getSimpleName());
                startNewDispatcherLeaderProcess(leaderSessionID);
                });
                }

                启动一个新的dispatcherRunner

                    
                  private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
                  //先停止已经有的,
                  stopDispatcherLeaderProcess();
                  //创建一个新的DispatchRunner
                  dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);


                  final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
                  //再启动一个新的
                  FutureUtils.assertNoException(
                  previousDispatcherLeaderProcessTerminationFuture.thenRun(
                  newDispatcherLeaderProcess::start));
                  }

                  AbstractDispatcherLeaderProcess类的start方法

                    @Override
                    public final void start() {
                    runIfStateIs(State.CREATED, this::startInternal);
                    }


                    private void startInternal() {
                    log.info("Start {}.", getClass().getSimpleName());
                    //这里改变状态为RUNNing,已经启动Diepatcher
                    state = State.RUNNING;
                    //SessionDispatcherLeaderProcess类的实现onStart
                    onStart();
                    }

                    启动JobGraphStore和启动Dispatcher

                      @Override
                      protected void onStart() {
                      //开启服务:启动JobGraphStore,用于存储jobGraph的组件
                      startServices();
                      //启动Dispatcher
                      onGoingRecoveryOperation =
                      createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults();
                      }

                      创建Dispacher

                        private CompletableFuture<Void>
                        createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults() {
                        final CompletableFuture<Collection<JobResult>> dirtyJobsFuture =
                        CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, ioExecutor);


                        return dirtyJobsFuture
                        .thenApplyAsync(
                        dirtyJobs ->
                        this.recoverJobsIfRunning(
                        dirtyJobs.stream()
                        .map(JobResult::getJobId)
                        .collect(Collectors.toSet())),
                        ioExecutor)
                        .thenAcceptBoth(dirtyJobsFuture, this::createDispatcherIfRunning)
                        .handle(this::onErrorIfRunning);
                        }

                        创建DispatcherRunner

                          private void createDispatcherIfRunning(
                          Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {
                          runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, recoveredDirtyJobResults));
                          }


                          final void runIfStateIs(State expectedState, Runnable action) {
                          runIfState(expectedState::equals, action);
                          }




                          private void runIfState(Predicate<State> actionPredicate, Runnable action) {
                          synchronized (lock) {
                          if (actionPredicate.test(state)) {
                          action.run();
                          }
                          }
                          }

                          创建Dispatcher

                          生产Dispatcher的ID

                            private void createDispatcher(
                            Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {


                            final DispatcherGatewayService dispatcherService =
                            dispatcherGatewayServiceFactory.create(
                            //生产一个UUID
                            DispatcherId.fromUuid(getLeaderSessionId()),
                            jobGraphs,
                            recoveredDirtyJobResults,
                            jobGraphStore,
                            jobResultStore);


                            completeDispatcherSetup(dispatcherService);
                            }

                            这里是一个接口进行创建

                            实现类是DefaultDispatcherGatewayServiceFactory

                              public interface DispatcherGatewayServiceFactory {
                              DispatcherGatewayService create(
                              DispatcherId dispatcherId,
                              Collection<JobGraph> recoveredJobs,
                              Collection<JobResult> recoveredDirtyJobResults,
                              JobGraphWriter jobGraphWriter,
                              JobResultStore jobResultStore);
                              }


                              最终创建Dispatcher

                              重要的是看dispatcherFactory.createDispatcher的创建过程,咱们接着看

                                @Override
                                public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
                                DispatcherId fencingToken,
                                Collection<JobGraph> recoveredJobs,
                                Collection<JobResult> recoveredDirtyJobResults,
                                JobGraphWriter jobGraphWriter,
                                JobResultStore jobResultStore) {


                                final Dispatcher dispatcher;

                                try {
                                //创建dispatcher
                                // SessionDispatcherFactory
                                dispatcher =
                                dispatcherFactory.createDispatcher(
                                rpcService,
                                fencingToken,
                                recoveredJobs,
                                recoveredDirtyJobResults,
                                (dispatcherGateway, scheduledExecutor, errorHandler) ->
                                new NoOpDispatcherBootstrap(),
                                PartialDispatcherServicesWithJobPersistenceComponents.from(
                                partialDispatcherServices, jobGraphWriter, jobResultStore));
                                } catch (Exception e) {
                                throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
                                }
                                //启动Dispatcher,给自己发送一条消息,通过RpcEndpoint
                                dispatcher.start();


                                return DefaultDispatcherGatewayService.from(dispatcher);
                                }

                                用枚举实现单例来创建Dispatcher

                                  public enum SessionDispatcherFactory implements DispatcherFactory {
                                  INSTANCE;


                                  @Override
                                  public StandaloneDispatcher createDispatcher(
                                  RpcService rpcService,
                                  DispatcherId fencingToken,
                                  Collection<JobGraph> recoveredJobs,
                                  Collection<JobResult> recoveredDirtyJobResults,
                                  DispatcherBootstrapFactory dispatcherBootstrapFactory,
                                  PartialDispatcherServicesWithJobPersistenceComponents
                                  partialDispatcherServicesWithJobPersistenceComponents)
                                  throws Exception {
                                  // create the default dispatcher
                                  return new StandaloneDispatcher(
                                  rpcService,
                                  fencingToken,
                                  recoveredJobs,
                                  recoveredDirtyJobResults,
                                  dispatcherBootstrapFactory,
                                  DispatcherServices.from(
                                  partialDispatcherServicesWithJobPersistenceComponents,
                                  JobMasterServiceLeadershipRunnerFactory.INSTANCE,
                                  CheckpointResourcesCleanupRunnerFactory.INSTANCE));
                                  }
                                  }

                                  Dispatcher的构造器

                                    /**
                                    * 当执行完成后,会跳转到onStart方法
                                    */
                                    public Dispatcher(
                                    RpcService rpcService,
                                    DispatcherId fencingToken,
                                    Collection<JobGraph> recoveredJobs,
                                    Collection<JobResult> recoveredDirtyJobs,
                                    DispatcherBootstrapFactory dispatcherBootstrapFactory,
                                    DispatcherServices dispatcherServices)
                                    throws Exception {
                                    this(
                                    rpcService,
                                    fencingToken,
                                    recoveredJobs,
                                    recoveredDirtyJobs,
                                    dispatcherBootstrapFactory,
                                    dispatcherServices,
                                    new DefaultJobManagerRunnerRegistry(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY));
                                    }

                                    执行Dispatcher的onStart方法

                                      @Override
                                      public void onStart() throws Exception {
                                      try {
                                      //启动Dispatcher服务
                                      startDispatcherServices();
                                      } catch (Throwable t) {
                                      final DispatcherException exception =
                                      new DispatcherException(
                                      String.format("Could not start the Dispatcher %s", getAddress()), t);
                                      onFatalError(exception);
                                      throw exception;
                                      }


                                      //重新定义重试次数
                                      startCleanupRetries();
                                      //恢复终断的job,重新执行
                                      startRecoveredJobs();
                                      //
                                      this.dispatcherBootstrap =
                                      this.dispatcherBootstrapFactory.create(
                                      getSelfGateway(DispatcherGateway.class),
                                      this.getRpcService().getScheduledExecutor(),
                                      this::onFatalError);
                                      }




                                      //启动Dispatcher服务
                                      private void startDispatcherServices() throws Exception {
                                      try {
                                      registerDispatcherMetrics(jobManagerMetricGroup);
                                      } catch (Exception e) {
                                      handleStartDispatcherServicesException(e);
                                      }
                                      }






                                        private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
                                        jobManagerMetricGroup.gauge(
                                        MetricNames.NUM_RUNNING_JOBS,
                                        // metrics can be called from anywhere and therefore, have to run without the main
                                        // thread safeguard being triggered. For metrics, we can afford to be not 100%
                                        // accurate
                                        () -> (long) jobManagerRunnerRegistry.getWrappedDelegate().size());
                                        }

                                        恢复所有失败的任务

                                        恢复任务后,需要将recoveredJobs进行清空,重点咱们看runRecoveredJob方法的执行。

                                          private void startRecoveredJobs() {
                                          for (JobGraph recoveredJob : recoveredJobs) {
                                          runRecoveredJob(recoveredJob);
                                          }
                                          recoveredJobs.clear();
                                          }




                                          private void runRecoveredJob(final JobGraph recoveredJob) {
                                          checkNotNull(recoveredJob);
                                          try {
                                          runJob(createJobMasterRunner(recoveredJob), ExecutionType.RECOVERY);
                                          } catch (Throwable throwable) {
                                          onFatalError(
                                          new DispatcherException(
                                          String.format(
                                          "Could not start recovered job %s.", recoveredJob.getJobID()),
                                          throwable));
                                          }
                                          }

                                          runJob执行

                                          客户端正常提交一个Job任务,会在Dispatcher接收到,就会创建一个JobManagerRunner,然后启动JobManagerRunner,这里就会启动一个主控程序,想JobManager申请资源,然后
                                          注意:在启动Dispatcher的时候,启动了JobManagerRunner

                                            private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType)
                                            throws Exception {
                                            jobManagerRunner.start();
                                            jobManagerRunnerRegistry.register(jobManagerRunner);


                                            final JobID jobId = jobManagerRunner.getJobID();


                                            final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
                                            jobManagerRunner
                                            .getResultFuture()
                                            .handleAsync(
                                            (jobManagerRunnerResult, throwable) -> {
                                            Preconditions.checkState(
                                            jobManagerRunnerRegistry.isRegistered(jobId)
                                            && jobManagerRunnerRegistry.get(jobId)
                                            == jobManagerRunner,
                                            "The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner.");


                                            if (jobManagerRunnerResult != null) {
                                            return handleJobManagerRunnerResult(
                                            jobManagerRunnerResult, executionType);
                                            } else {
                                            return CompletableFuture.completedFuture(
                                            jobManagerRunnerFailed(
                                            jobId, JobStatus.FAILED, throwable));
                                            }
                                            },
                                            getMainThreadExecutor())
                                            .thenCompose(Function.identity());


                                            final CompletableFuture<Void> jobTerminationFuture =
                                            cleanupJobStateFuture.thenCompose(
                                            cleanupJobState ->
                                            removeJob(jobId, cleanupJobState)
                                            .exceptionally(
                                            throwable ->
                                            logCleanupErrorWarning(jobId, throwable)));


                                            FutureUtils.handleUncaughtException(
                                            jobTerminationFuture,
                                            (thread, throwable) -> fatalErrorHandler.onFatalError(throwable));
                                            registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture);
                                            }

                                            总结

                                            本文主要讲解Dispacher的创建过程,在创建过程中抓住主线,对于失败的任务进行重启,然后进行运行任务。整个服务端的启动通过5讲都讲完。

                                            Flink源码解析1-脚本入口(41)

                                            Flink源码解析2-JobManager初始化(42)

                                            Flink源码解析3-WebMonitorEndpoint启动(42)

                                            Flink源码解析4-ResourceManager创建和启动(44)

                                            感谢阅读。期待点赞、分享、关注。

                                            奇迹的出现往往就在再坚持一下的时候!


                                            文章转载自beenrun,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                            评论