
点击蓝字,关注我们
作者 | 李杰 移动云,Apache DolphinScheduler贡献者

在现代数据驱动的企业中,工作流调度系统是数据管道(Data Pipeline)的“中枢神经”。从 ETL 任务到机器学习训练,从报表生成到实时监控,几乎所有关键业务都依赖于一个稳定、高效、易扩展的调度引擎。
笔者认为Apache DolphinScheduler 3.1.9是稳定且广泛使用的版本,故本文将聚焦于这一版本,解析 Master 服务启动时相关流程,深入其源码核心,剖析其架构设计、模块划分与关键实现机制,帮助开发者理解 Master “如何工作”,并为进一步二次开发或性能优化打下基础。
本系列文章分为 3 个部分,分别为 Master Server 启动流程、Worker server 启动流程,以及相关流程图,本文为第一部分。
1
Master Server启动核心概览
代码入口:org.apache.dolphinscheduler.server.master.MasterServer#run
public void run() throws SchedulerException {// 1、init rpc serverthis.masterRPCServer.start();// 2、install task pluginthis.taskPluginManager.loadPlugin();// 3、self tolerantthis.masterRegistryClient.start();this.masterRegistryClient.setRegistryStoppable(this);// 4、master 调度this.masterSchedulerBootstrap.init();this.masterSchedulerBootstrap.start();// 5、事件执行服务this.eventExecuteService.start();// 6、容错机制this.failoverExecuteThread.start();// 7、Quartz调度this.schedulerApi.start();...}
1.1 rpc启动:
描述:注册相关命令的process处理器,如task执行中、task执行结果、终止task等。 代码入口:org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer#start
public void start() {...// 任务执行中的请求处理器this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);// 任务执行结果的请求处理器this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);// 任务终止的请求处理器this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST,workflowExecutingDataRequestProcessor);// 流式任务启动请求处理器this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START, taskExecuteStartProcessor);// logger server// log相关,查看或者获取日志等操作的处理器this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);this.nettyRemotingServer.start();logger.info("Started Master RPC Server...");}
1.2 任务插件初始化:
描述:task的相关模板操作,如创建task、解析task参数、获取task资源信息等。对于该插件,api、master、worker都需要进行注册,在master的作用是设置数据源和UDF信息等。
1.3 Self Tolerant(Master注册):
描述:将自身信息注册至注册中心(本文以zookeeper为例),同时监听自身、其他master和所有worker节点的注册情况变化,从而做相应的容错处理。 代码入口:org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient#start
public void start() {try {this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient);// 1、将自身信息注册至注册中心;registry();// 2、监听自身与注册中心的连接情况;registryClient.addConnectionStateListener(new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));// 3、监听其他master与所有worker在注册中心的活跃情况,做相应的容错工作处理// 如对灭亡的master上面的任务进行容错,同时将在worker节点上kill任务registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());} catch (Exception e) {throw new RegistryException("Master registry client start up error", e);}}
1.4 Master 调度
描述:一个扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的业务操作,是工作流启动、实例容错等处理的核心逻辑。 代码入口:org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#run
public void run() {while (!ServerLifeCycleManager.isStopped()) {try {if (!ServerLifeCycleManager.isRunning()) {// the current server is not at running status, cannot consume command.logger.warn("The current server {} is not at running status, cannot consumes commands.", this.masterAddress);Thread.sleep(Constants.SLEEP_TIME_MILLIS);}// todo: if the workflow event queue is much, we need to handle the back pressureboolean isOverload =OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());// 如果cpu以及memory负载过高,那么就暂时不处理命令if (isOverload) {logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);MasterServerMetrics.incMasterOverload();Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}// 从数据库中获取commands执行命令,如启动工作流,容错工作流实例等List<Command> commands = findCommands();if (CollectionUtils.isEmpty(commands)) {// indicate that no command ,sleep for 1sThread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}// 将相应的commands转为工作流实例,转换成功后删除相应的commandsList<ProcessInstance> processInstances = command2ProcessInstance(commands);if (CollectionUtils.isEmpty(processInstances)) {// indicate that the command transform to processInstance error, sleep for 1sThread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}MasterServerMetrics.incMasterConsumeCommand(commands.size());processInstances.forEach(processInstance -> {try {LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());if (processInstanceExecCacheManager.contains(processInstance.getId())) {logger.error("The workflow instance is already been cached, this case shouldn't be happened");}// 创建工作流执行线程,负责DAG任务切分、任务提交监控、各种不同事件类型的逻辑处理WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,processService,processInstanceDao,nettyExecutorManager,processAlertManager,masterConfig,stateWheelExecuteThread,curingGlobalParamsService);// 此处将每个工作流执行线程进行缓存,后续从缓存中获取该线程进行执行processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);// 将启动工作流事件放入工作流事件队列中,然后workflowEventLooper不断从队列中获取事件进行处理workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,processInstance.getId()));} finally {LoggerUtils.removeWorkflowInstanceIdMDC();}});} catch (InterruptedException interruptedException) {logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);Thread.currentThread().interrupt();break;} catch (Exception e) {logger.error("Master schedule workflow error", e);// sleep for 1s here to avoid the database down cause the exception boomThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);}}}
上述步骤产生工作流事件后,WorkflowEventLooper不断地消费处理:
public void run() {WorkflowEvent workflowEvent = null;while (!ServerLifeCycleManager.isStopped()) {...workflowEvent = workflowEventQueue.poolEvent();LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);WorkflowEventHandler workflowEventHandler =workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());// 相应的事件处理器来处理工作流事件,主要功能是执行上述中缓存的工作流执行线程WorkflowExecuteRunnableworkflowEventHandler.handleWorkflowEvent(workflowEvent);...}}
启动WorkflowExecuteRunnable时,主要功能是初始化DAG、提交且分发task等:
public WorkflowSubmitStatue call() {...LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {// 构建工作流的DAGbuildFlowDag();workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;logger.info("workflowStatue changed to :{}", workflowRunnableStatus);}if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {// 初始化相关队列, 将相关队列都清空initTaskQueue();workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;logger.info("workflowStatue changed to :{}", workflowRunnableStatus);}if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {// 从起始节点开始执行,提交所有节点任务submitPostNode(null);workflowRunnableStatus = WorkflowRunnableStatus.STARTED;logger.info("workflowStatue changed to :{}", workflowRunnableStatus);}return WorkflowSubmitStatue.SUCCESS;...}
此时parentNodeCode为null,表示从根节点开始启动所有node:
private void submitPostNode(String parentNodeCode) throws StateEventHandleException {...// 根据起点节点parentNodeCode获取其后续待执行的taskList<TaskInstance> taskInstances=...for (TaskInstance task : taskInstances) {...// 将task放到 “预提交”队列 readyToSubmitTaskQueueaddTaskToStandByList(task);}// 处理“预提交”队列readyToSubmitTaskQueue,提交tasksubmitStandByTask();...}
public void submitStandByTask() throws StateEventHandleException {int length = readyToSubmitTaskQueue.size();for (int i = 0; i < length; i++) {TaskInstance task = readyToSubmitTaskQueue.peek();...// 检测task的依赖关系是否构建成功,如果成功,则进行提交操作DependResult dependResult = getDependResultForTask(task);if (DependResult.SUCCESS == dependResult) {logger.info("The dependResult of task {} is success, so ready to submit to execute", task.getName());// 提交taskOptional<TaskInstance> taskInstanceOptional = submitTaskExec(task);// 提交失败if (!taskInstanceOptional.isPresent()) {...} else {// 提交成功,从“预提交”队里中清除该taskremoveTaskFromStandbyList(task);}}...}}
private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {...// 根据master侧任务类型(不是shell、spark那种, 此处是例如Common、Condition、SubTask、SwitchTask等),做相应的初始化操作,为了便于理解,本文采用通用task来处理ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());taskProcessor.init(taskInstance, processInstance);...// 补充taskInstance参数,且提交保存至dbboolean submit = taskProcessor.action(TaskAction.SUBMIT);...// 若为通用task类型,则将任务提交到一个待dispatch的task队列taskPriorityQueue中,有消费者TaskPriorityQueueConsumer专门消费该队列boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);...// 若为通用task类型,则不做任何处理taskProcessor.action(TaskAction.RUN);// 增加超时检测,若是超时,会发生告警stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);// 增加状态检查,当成功或者其他状态时,会进行相应的处理stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance);...return Optional.of(taskInstance);...}
TaskPriorityQueueConsumer是一个专门消费上述taskPriorityQueue队列的线程,在程序启动时开始监听taskPriorityQueue队列:
public void run() {int fetchTaskNum = masterConfig.getDispatchTaskNumber();while (!ServerLifeCycleManager.isStopped()) {try {// 消费需要dispatch的task// 为task挑选可用worker节点,然后将task分配至该worker节点List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);...} catch (Exception e) {TaskMetrics.incTaskDispatchError();logger.error("dispatcher task error", e);}}}
public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {...// 利用多线程并发消费taskCountDownLatch latch = new CountDownLatch(fetchTaskNum);for (int i = 0; i < fetchTaskNum; i++) {TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);...consumerThreadPoolExecutor.submit(() -> {try {// 为task进行分发操作boolean dispatchResult = this.dispatchTask(taskPriority);...} finally {// make sure the latch countDownlatch.countDown();}});}latch.await();...}
protected boolean dispatchTask(TaskPriority taskPriority) {...try {WorkflowExecuteRunnable workflowExecuteRunnable =processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());...Optional<TaskInstance> taskInstanceOptional =workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());...TaskInstance taskInstance = taskInstanceOptional.get();TaskExecutionContext context = taskPriority.getTaskExecutionContext();ExecutionContext executionContext =new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(),taskInstance);...// 挑选可用worker节点,然后将task分配至该worker节点result = dispatcher.dispatch(executionContext);...} catch (RuntimeException | ExecuteException e) {logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);}return result;}
具体的分发操作:
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {...// host select// 根据配置的选择器,筛选符合要求的worker节点信息Host host = hostManager.select(context);...context.setHost(host);...// 将task信息通过RPC发送给挑选的worker节点,要是发送失败,则往其他可用的worker节点发送return executorManager.execute(context);...}
1.5 事件执行服务:
描述:主要负责工作流实例的事件队列的轮询,因为工作流在执行过程中会不断产生事件,如工作流提交失败、任务状态变更等,下面方法就是处理产生的的相关事件。 代码入口:org.apache.dolphinscheduler.server.master.runner.EventExecuteService#run
public void run() {while (!ServerLifeCycleManager.isStopped()) {try {// 处理工作流执行线程的相关事件,最终会触发WorkflowExecuteRunnable#handleEvents方法workflowEventHandler();// 处理流式任务执行线程的相关事件streamTaskEventHandler();TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);} ...}}
工作流和实时任务的事件处理逻辑基本一致,下述只描述工作流的处理过程:
public void handleEvents() {...StateEvent stateEvent = null;while (!this.stateEvents.isEmpty()) {try {stateEvent = this.stateEvents.peek();...StateEventHandler stateEventHandler =StateEventHandlerManager.getStateEventHandler(stateEvent.getType()).orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));logger.info("Begin to handle state event, {}", stateEvent);// 根据不同事件处理器做不同的处理逻辑if (stateEventHandler.handleStateEvent(this, stateEvent)) {this.stateEvents.remove(stateEvent);}} ...}}
下面以工作流提交失败为例:
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,StateEvent stateEvent) throws StateEventHandleException {WorkflowStateEvent workflowStateEvent = (WorkflowStateEvent) stateEvent;ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();measureProcessState(workflowStateEvent);log.info("Handle workflow instance submit fail state event, the current workflow instance state {} will be changed to {}",processInstance.getState(), workflowStateEvent.getStatus());// 将实例状态改为FAILURE后入库workflowExecuteRunnable.updateProcessInstanceState(workflowStateEvent);workflowExecuteRunnable.endProcess();return true;}
1.6 容错机制:
描述:主要负责Master容错和Worker容错的相关逻辑。 代码入口:org.apache.dolphinscheduler.server.master.service.MasterFailoverService#checkMasterFailover
public void checkMasterFailover() {// 获取需要容错的master节点List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost().stream()// failover myself || dead server// 自身或者发生已经灭亡的master.filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER)).distinct().collect(Collectors.toList());if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {return;}...for (String needFailoverMasterHost : needFailoverMasterHosts) {failoverMaster(needFailoverMasterHost);}}
private void doFailoverMaster(@NonNull String masterHost) {...// 从注册中心获取master的启动时间Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER),masterHost);// 从获取与当前master的需要容错的工作路实例(主要根据需要容错的状态去筛选,如:SUBMITTED_SUCCESS、RUNNING_EXECUTION)List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);...for (ProcessInstance processInstance : needFailoverProcessInstanceList) {...// 判断该实例是否需要容错处理,判断逻辑例如:// 1、其他已经灭亡的master还未重新启动,此时需要进行容错// 2、若工作流实例的启动时间比master的启动时间早,说明master重启过,此时需要容错// ...if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {LOGGER.info("WorkflowInstance doesn't need to failover");continue;}List<TaskInstance> taskInstanceList =...for (TaskInstance taskInstance : taskInstanceList) {...if (!checkTaskInstanceNeedFailover(taskInstance)) {LOGGER.info("The taskInstance doesn't need to failover");continue;}// 对于worker侧的任务,需要进行kill处理,同时将任务实例状态标记为NEED_FAULT_TOLERANCEfailoverTaskInstance(processInstance, taskInstance);...}ProcessInstanceMetrics.incProcessInstanceByState("failover");// updateProcessInstance host is null to mark this processInstance has been failover// and insert a failover commandprocessInstance.setHost(Constants.NULL);// 生成需要容错的command入库,待master调度进行扫描processService.processNeedFailoverProcessInstances(processInstance);...}}
2
结语
以上是笔者对 Apache DolphinScheduler 3.1.9 版本特性与架构的初步理解,基于个人学习与实践整理而成,后续还会输出 Worker 启动流程以及 Master 与 Worker 的交互流程相关文章。由于水平有限,文中难免存在理解偏差或疏漏之处,恳请各位读者不吝指正。如有不同见解,欢迎交流讨论,共同进步。

用户案例
迁移实战
发版消息
加入社区
关注社区的方式有很多:
GitHub: https://github.com/apache/dolphinscheduler 官网:https://dolphinscheduler.apache.org/en-us 订阅开发者邮件:dev@dolphinscheduler@apache.org(向邮箱发送任意内容,收到邮件后回复同意订阅即可) X.com:@DolphinSchedule YouTube:https://www.youtube.com/@apachedolphinscheduler Slack:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1cmrxsio1-nJHxRJa44jfkrNL_Nsy9Qg
同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。
📂非代码方式包括:
完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。
👩💻代码方式包括:
查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。


你的好友秀秀子拍了拍你
并请你帮她点一下“分享”






