Flink中RPC通信总结
(1)RpcGateway:路由,RPC的根节点,各种RPC组件都是RpcGateway的子类
(2)RpcServer:RpcService和RpcEndpoint之间的粘合层
(3)RpcEndpoint:业务逻辑的载体,对应Actor的封装,当创建一个Endpoint的时候,就要想到会调用onStart()方法。可以理解成初始化。
(4)RpcService:对应ActorSystem的封装
Dispatcher的整个创建过程

入口dispatcherRunner创建
DefaultDispatcherResourceManagerComponentFactory类中的create方法
//创建DispatcherRunner,内部创建和启动DispatchdispatcherRunner =dispatcherRunnerFactory.createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(),fatalErrorHandler,new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),ioExecutor,rpcService,partialDispatcherServices);dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
创建DispatcherRunner
@Overridepublic DispatcherRunner createDispatcherRunner(LeaderElectionService leaderElectionService,FatalErrorHandler fatalErrorHandler,JobPersistenceComponentFactory jobPersistenceComponentFactory,Executor ioExecutor,RpcService rpcService,PartialDispatcherServices partialDispatcherServices)throws Exception {final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =dispatcherLeaderProcessFactoryFactory.createFactory(jobPersistenceComponentFactory,ioExecutor,rpcService,partialDispatcherServices,fatalErrorHandler);return DefaultDispatcherRunner.create(leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);}
直接用new创建DefaultDispatcherRunner
创建了DefaultDispatcherRunner对象,开启DispatchRunner声明周期,
leaderElectionService的作用是帮助角色dispatcherRunner完成选举
public static DispatcherRunner create(LeaderElectionService leaderElectionService,FatalErrorHandler fatalErrorHandler,DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)throws Exception {final DefaultDispatcherRunner dispatcherRunner =new DefaultDispatcherRunner(leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);//leaderElectionService帮助角色完成选举return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner, leaderElectionService);}
静态方法创建DispatcherRunner
leaderElectionService是帮助dispatcherRunner进行选举
public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner, leaderElectionService);}
启动选举dispatcherRunner
调用了leaderElectionService将参数dispatcherRunner传入
private DispatcherRunnerLeaderElectionLifecycleManager(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {this.dispatcherRunner = dispatcherRunner;this.leaderElectionService = leaderElectionService;/*** 启动选举,参数是dispatcherRunner:DispatcherRunner* 选举服务对象内部的leaderContender 是DefaultDisPatcherRunner*/leaderElectionService.start(dispatcherRunner);}
StandaloneLeaderElectionService类中的start方法进行实现
看到代码大家发现,这里是咱们前面已经讲过的选举过程,在选举ResourceManager和WebMonitorEndpoint组件的时候,也是使用该方法。
在这里是DispatchRunner的竞选
这里的contender参数实际是DispatchRunner对象。
@Overridepublic void start(LeaderContender newContender) throws Exception {if (contender != null) {// Service was already startedthrow new IllegalArgumentException("Leader election service cannot be started multiple times.");}contender = Preconditions.checkNotNull(newContender);// directly grant leadership to the given contendercontender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);}
DefaultDispatcherRunner#grantLeadership
@Override
public void grantLeadership(UUID leaderSessionID) {runActionIfRunning(() -> {LOG.info("{} was granted leadership with leader id {}. Creating new {}.",getClass().getSimpleName(),leaderSessionID,DispatcherLeaderProcess.class.getSimpleName());startNewDispatcherLeaderProcess(leaderSessionID);});}
启动一个新的dispatcherRunner
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {//先停止已经有的,stopDispatcherLeaderProcess();//创建一个新的DispatchRunnerdispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;//再启动一个新的FutureUtils.assertNoException(previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));}
AbstractDispatcherLeaderProcess类的start方法
@Overridepublic final void start() {runIfStateIs(State.CREATED, this::startInternal);}private void startInternal() {log.info("Start {}.", getClass().getSimpleName());//这里改变状态为RUNNing,已经启动Diepatcherstate = State.RUNNING;//SessionDispatcherLeaderProcess类的实现onStartonStart();}
启动JobGraphStore和启动Dispatcher
@Overrideprotected void onStart() {//开启服务:启动JobGraphStore,用于存储jobGraph的组件startServices();//启动DispatcheronGoingRecoveryOperation =createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults();}
创建Dispacher
private CompletableFuture<Void>createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults() {final CompletableFuture<Collection<JobResult>> dirtyJobsFuture =CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, ioExecutor);return dirtyJobsFuture.thenApplyAsync(dirtyJobs ->this.recoverJobsIfRunning(dirtyJobs.stream().map(JobResult::getJobId).collect(Collectors.toSet())),ioExecutor).thenAcceptBoth(dirtyJobsFuture, this::createDispatcherIfRunning).handle(this::onErrorIfRunning);}
创建DispatcherRunner
private void createDispatcherIfRunning(Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, recoveredDirtyJobResults));}final void runIfStateIs(State expectedState, Runnable action) {runIfState(expectedState::equals, action);}private void runIfState(Predicate<State> actionPredicate, Runnable action) {synchronized (lock) {if (actionPredicate.test(state)) {action.run();}}}
创建Dispatcher
生产Dispatcher的ID
private void createDispatcher(Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {final DispatcherGatewayService dispatcherService =dispatcherGatewayServiceFactory.create(//生产一个UUIDDispatcherId.fromUuid(getLeaderSessionId()),jobGraphs,recoveredDirtyJobResults,jobGraphStore,jobResultStore);completeDispatcherSetup(dispatcherService);}
这里是一个接口进行创建
实现类是DefaultDispatcherGatewayServiceFactory
public interface DispatcherGatewayServiceFactory {DispatcherGatewayService create(DispatcherId dispatcherId,Collection<JobGraph> recoveredJobs,Collection<JobResult> recoveredDirtyJobResults,JobGraphWriter jobGraphWriter,JobResultStore jobResultStore);}
最终创建Dispatcher
重要的是看dispatcherFactory.createDispatcher的创建过程,咱们接着看
@Overridepublic AbstractDispatcherLeaderProcess.DispatcherGatewayService create(DispatcherId fencingToken,Collection<JobGraph> recoveredJobs,Collection<JobResult> recoveredDirtyJobResults,JobGraphWriter jobGraphWriter,JobResultStore jobResultStore) {final Dispatcher dispatcher;try {//创建dispatcher// SessionDispatcherFactorydispatcher =dispatcherFactory.createDispatcher(rpcService,fencingToken,recoveredJobs,recoveredDirtyJobResults,(dispatcherGateway, scheduledExecutor, errorHandler) ->new NoOpDispatcherBootstrap(),PartialDispatcherServicesWithJobPersistenceComponents.from(partialDispatcherServices, jobGraphWriter, jobResultStore));} catch (Exception e) {throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);}//启动Dispatcher,给自己发送一条消息,通过RpcEndpointdispatcher.start();return DefaultDispatcherGatewayService.from(dispatcher);}
用枚举实现单例来创建Dispatcher
public enum SessionDispatcherFactory implements DispatcherFactory {INSTANCE;@Overridepublic StandaloneDispatcher createDispatcher(RpcService rpcService,DispatcherId fencingToken,Collection<JobGraph> recoveredJobs,Collection<JobResult> recoveredDirtyJobResults,DispatcherBootstrapFactory dispatcherBootstrapFactory,PartialDispatcherServicesWithJobPersistenceComponentspartialDispatcherServicesWithJobPersistenceComponents)throws Exception {// create the default dispatcherreturn new StandaloneDispatcher(rpcService,fencingToken,recoveredJobs,recoveredDirtyJobResults,dispatcherBootstrapFactory,DispatcherServices.from(partialDispatcherServicesWithJobPersistenceComponents,JobMasterServiceLeadershipRunnerFactory.INSTANCE,CheckpointResourcesCleanupRunnerFactory.INSTANCE));}}
Dispatcher的构造器
/*** 当执行完成后,会跳转到onStart方法*/public Dispatcher(RpcService rpcService,DispatcherId fencingToken,Collection<JobGraph> recoveredJobs,Collection<JobResult> recoveredDirtyJobs,DispatcherBootstrapFactory dispatcherBootstrapFactory,DispatcherServices dispatcherServices)throws Exception {this(rpcService,fencingToken,recoveredJobs,recoveredDirtyJobs,dispatcherBootstrapFactory,dispatcherServices,new DefaultJobManagerRunnerRegistry(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY));}
执行Dispatcher的onStart方法
@Overridepublic void onStart() throws Exception {try {//启动Dispatcher服务startDispatcherServices();} catch (Throwable t) {final DispatcherException exception =new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), t);onFatalError(exception);throw exception;}//重新定义重试次数startCleanupRetries();//恢复终断的job,重新执行startRecoveredJobs();//this.dispatcherBootstrap =this.dispatcherBootstrapFactory.create(getSelfGateway(DispatcherGateway.class),this.getRpcService().getScheduledExecutor(),this::onFatalError);}//启动Dispatcher服务private void startDispatcherServices() throws Exception {try {registerDispatcherMetrics(jobManagerMetricGroup);} catch (Exception e) {handleStartDispatcherServicesException(e);}}
private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS,// metrics can be called from anywhere and therefore, have to run without the main// thread safeguard being triggered. For metrics, we can afford to be not 100%// accurate() -> (long) jobManagerRunnerRegistry.getWrappedDelegate().size());}
恢复所有失败的任务
恢复任务后,需要将recoveredJobs进行清空,重点咱们看runRecoveredJob方法的执行。
private void startRecoveredJobs() {for (JobGraph recoveredJob : recoveredJobs) {runRecoveredJob(recoveredJob);}recoveredJobs.clear();}private void runRecoveredJob(final JobGraph recoveredJob) {checkNotNull(recoveredJob);try {runJob(createJobMasterRunner(recoveredJob), ExecutionType.RECOVERY);} catch (Throwable throwable) {onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", recoveredJob.getJobID()),throwable));}}
runJob执行
客户端正常提交一个Job任务,会在Dispatcher接收到,就会创建一个JobManagerRunner,然后启动JobManagerRunner,这里就会启动一个主控程序,想JobManager申请资源,然后
注意:在启动Dispatcher的时候,启动了JobManagerRunner
private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType)throws Exception {jobManagerRunner.start();jobManagerRunnerRegistry.register(jobManagerRunner);final JobID jobId = jobManagerRunner.getJobID();final CompletableFuture<CleanupJobState> cleanupJobStateFuture =jobManagerRunner.getResultFuture().handleAsync((jobManagerRunnerResult, throwable) -> {Preconditions.checkState(jobManagerRunnerRegistry.isRegistered(jobId)&& jobManagerRunnerRegistry.get(jobId)== jobManagerRunner,"The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner.");if (jobManagerRunnerResult != null) {return handleJobManagerRunnerResult(jobManagerRunnerResult, executionType);} else {return CompletableFuture.completedFuture(jobManagerRunnerFailed(jobId, JobStatus.FAILED, throwable));}},getMainThreadExecutor()).thenCompose(Function.identity());final CompletableFuture<Void> jobTerminationFuture =cleanupJobStateFuture.thenCompose(cleanupJobState ->removeJob(jobId, cleanupJobState).exceptionally(throwable ->logCleanupErrorWarning(jobId, throwable)));FutureUtils.handleUncaughtException(jobTerminationFuture,(thread, throwable) -> fatalErrorHandler.onFatalError(throwable));registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture);}
总结
本文主要讲解Dispacher的创建过程,在创建过程中抓住主线,对于失败的任务进行重启,然后进行运行任务。整个服务端的启动通过5讲都讲完。
Flink源码解析3-WebMonitorEndpoint启动(42)
Flink源码解析4-ResourceManager创建和启动(44)
感谢阅读。期待点赞、分享、关注。
奇迹的出现往往就在再坚持一下的时候!




