本文是基于1.2.0版本进行分析,与最新版本的实现有一些出入,还请读者辩证的看待本源码分析。具体细节可能描述的不是很准确,仅供参考。

SELECT command.*FROM t_ds_command commandJOIN t_ds_process_definition definitionON command.process_definition_id = definition.idWHERE definition.release_state = 1AND definition.flag = 1ORDER BY command.update_time ASCLIMIT 1
一个任务执行,会分别占用master和worker各一个线程,这一点不太好。
这样来看,一个任务执行,会占用master2个线程。
dolphinscheduler-ui 前端页面模块 dolphinscheduler-server 核心模块。包括master/worker等功能 dolphinscheduler-common 公共模块。公共方法或类 dolphinscheduler-api Restful接口。前后端交互层,与master/worker交互等功能 dolphinscheduler-dao 数据操作层。实体定义、数据存储 dolphinscheduler-alert 预警模块。与预警相关的方法、功能 dolphinscheduler-rpc 日志查看。提供日志实时查看rpc功能 dolphinscheduler-dist 与编译、分发相关的模块。没有具体逻辑功能
UI功能不分析 从与UI交互的API模块开始着手看 重点分析核心功能 非核心功能仅做了解
@SpringBootApplication@ConditionalOnProperty(prefix = "server", name = "is-combined-server", havingValue = "true")@ServletComponentScan@ComponentScan("org.apache.dolphinscheduler")@Import({MasterServer.class, WorkerServer.class})@EnableSwagger2public class CombinedApplicationServer extends SpringBootServletInitializer {public static void main(String[] args) throws Exception {ApiApplicationServer.main(args);LoggerServer server = new LoggerServer();server.start();AlertServer alertServer = AlertServer.getInstance();alertServer.start();}}
也不知道是否内嵌的意义在哪里,直接内嵌不好么?

public Result createProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,@RequestParam(value = "name", required = true) String name,@RequestParam(value = "processDefinitionJson", required = true) String json,@RequestParam(value = "locations", required = true) String locations,@RequestParam(value = "connects", required = true) String connects,@RequestParam(value = "description", required = false) String description) {try {logger.info("login user {}, create process definition, project name: {}, process definition name: {}, " +"process_definition_json: {}, desc: {} locations:{}, connects:{}",loginUser.getUserName(), projectName, name, json, description, locations, connects);Map<String, Object> result = processDefinitionService.createProcessDefinition(loginUser, projectName, name, json,description, locations, connects);return returnDataList(result);} catch (Exception e) {logger.error(Status.CREATE_PROCESS_DEFINITION.getMsg(), e);return error(Status.CREATE_PROCESS_DEFINITION.getCode(), Status.CREATE_PROCESS_DEFINITION.getMsg());}}
但这段代码有一个controller与service分隔不清的地方:HTTP返回的结果由谁处理。此处返回结果是由service负责的,service会创建一个Map<String, Object>类型的result字段,然后调用result.put("processDefinitionId",processDefine.getId());设置最终返回的数据。其实个人是不敢苟同这种做法的,严格来说,service只返回与业务相关的实体,HTTP具体返回什么信息应该交由controller处理。
public Map<String, Object> createProcessDefinition(User loginUser, String projectName, String name,String processDefinitionJson, String desc, String locations, String connects) throws JsonProcessingException {Map<String, Object> result = new HashMap<>(5);Project project = projectMapper.queryByName(projectName);// check project authMap<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);Status resultStatus = (Status) checkResult.get(Constants.STATUS);if (resultStatus != Status.SUCCESS) {return checkResult;}ProcessDefinition processDefine = new ProcessDefinition();Date now = new Date();ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) {return checkProcessJson;}processDefine.setName(name);processDefine.setReleaseState(ReleaseState.OFFLINE);processDefine.setProjectId(project.getId());processDefine.setUserId(loginUser.getId());processDefine.setProcessDefinitionJson(processDefinitionJson);processDefine.setDescription(desc);processDefine.setLocations(locations);processDefine.setConnects(connects);processDefine.setTimeout(processData.getTimeout());processDefine.setTenantId(processData.getTenantId());//custom global paramsList<Property> globalParamsList = processData.getGlobalParams();if (globalParamsList != null && globalParamsList.size() > 0) {Set<Property> globalParamsSet = new HashSet<>(globalParamsList);globalParamsList = new ArrayList<>(globalParamsSet);processDefine.setGlobalParamList(globalParamsList);}processDefine.setCreateTime(now);processDefine.setUpdateTime(now);processDefine.setFlag(Flag.YES);processDefineMapper.insert(processDefine);putMsg(result, Status.SUCCESS);result.put("processDefinitionId",processDefine.getId());return result;}
校验当前用户是否拥有所属项目的权限 校验流程定义JSON是否合法。例如是否有环 构造ProcessDefinition对象插入数据库 设置HTTP返回结果
ProcessDefinitionService的功能非常不合理,居然还有鉴权的功能,按照我的理解,有一个校验、插入数据库的功就可以了,其他的功能都可以抛出去。
@Data@TableName("t_ds_process_definition")public class ProcessDefinition
校验当前用户是否拥有所属项目的权限 校验流程定义JSON是否合法。例如是否有环 构造Schedule对象插入数据库 设置HTTP返回结果


调用OSUtils.checkResource,检查当前资源(内存、CPU)。 资源超出阈值,则休眠1秒进入下一次循环。 检查zookeeper是否连接成功 获取一个InterProcessMutex锁(分布式的公平可重入互斥锁)。也就是只有一个master可以获取到这个锁 查询一个Command,不为null时进行后续逻辑。 休眠1秒,进入下一次循环 进入下一次循环之前,释放InterProcessMutex锁
/*** if the process closes, a signal is placed as true, and all threads get this flag to stop working*/public class Stopper {private static volatile AtomicBoolean signal = new AtomicBoolean(false);public static final boolean isStoped(){return signal.get();}public static final boolean isRunning(){return !signal.get();}public static final void stop(){signal.getAndSet(true);}}
if (command != null) {logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));try{processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);if (processInstance != null) {logger.info("start master exec thread , split DAG ...");masterExecService.execute(new MasterExecThread(processInstance,processDao));}}catch (Exception e){logger.error("scan command error ", e);processDao.moveToErrorCommand(command, e.toString());}}
调用OSUtils.checkResource,检查当前资源(内存、CPU)。 资源超出阈值,则休眠1秒进入下一次循环。 检查zookeeper是否连接成功 获取一个InterProcessMutex锁(分布式的公平可重入互斥锁)。也就是只有一个master可以获取到这个锁 查询一个Command,如果当前线程数够用,则创建一个流程实例(ProcessInstance),交给MasterExecThread线程处理。 休眠1秒,进入下一次循环 进入下一次循环之前,释放InterProcessMutex锁

但个人感觉其定义不是非常清晰。如果是mapper的一个全集,则其他任何地方都不应该再调用mapper,事实又不是这样;如果只是流程定义相关的操作, 其功能又过于大。
public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){this.processDao = processDao;this.processInstance = processInstance;int masterTaskExecNum = conf.getInt(Constants.MASTER_EXEC_TASK_THREADS,Constants.defaultMasterTaskExecNum);this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",masterTaskExecNum);}
static {try {conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);}catch (ConfigurationException e){logger.error("load configuration failed : " + e.getMessage(),e);System.exit(1);}}
配置这种字段,完全可以全局唯一,到处传参,没必要在new一个。一般情况下这个类的内容也不会修改。
@Overridepublic void run() {// process instance is nullif (processInstance == null){logger.info("process instance is not exists");return;}// check to see if it's doneif (processInstance.getState().typeIsFinished()){logger.info("process instance is done : {}",processInstance.getId());return;}try {if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){// sub process complement dataexecuteComplementProcess();}else{// execute flowexecuteProcess();}}catch (Exception e){logger.error("master exec thread exception: " + e.getMessage(), e);logger.error("process execute failed, process id:{}", processInstance.getId());processInstance.setState(ExecutionStatus.FAILURE);processInstance.setEndTime(new Date());processDao.updateProcessInstance(processInstance);}finally {taskExecService.shutdown();// post handlepostHandle();}}
判断processInstance是否为null。为null则退出 判断processInstance是否已经完成(成功、报错、取消、暂停、等待) 判断是否为补数。是则走补数的逻辑 执行当前流程定义实例(executeProcess) 调用taskExecService.shutdown(),等待所有线程正常退出
感觉第一步有点多此一举。
List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());for(TaskInstance task : taskInstanceList){if(task.isTaskComplete()){completeTaskList.put(task.getName(), task);}if(task.getState().typeIsFailure() && !task.taskCanRetry()){errorTaskList.put(task.getName(), task);}}
private void buildFlowDag() throws Exception {recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());// generate process to get DAG infoList<String> recoveryNameList = getRecoveryNodeNameList();List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),startNodeNameList, recoveryNameList, processInstance.getTaskDependType());if(processDag == null){logger.error("processDag is null");return;}// generate process dagdag = DagHelper.buildDagGraph(processDag);}
/*** the object of DAG*/private DAG<String,TaskNode,TaskNodeRelation> dag;
/*** analysis of DAG* Node: node* NodeInfo:node description information* EdgeInfo: edge description information*/public class DAG<Node, NodeInfo, EdgeInfo>


public static DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();/*** add vertex*/if (CollectionUtils.isNotEmpty(processDag.getNodes())){for (TaskNode node : processDag.getNodes()){dag.addNode(node.getName(),node);}}/*** add edge*/if (CollectionUtils.isNotEmpty(processDag.getEdges())){for (TaskNodeRelation edge : processDag.getEdges()){dag.addEdge(edge.getStartNode(),edge.getEndNode());}}return dag;}


submitPostNode(null) 起一个while循环,直至流程定义实例停止(成功、失败、取消、暂停、等待) 首先判断是否超时,超时则发送预警邮件 获取当前活动的任务节点的Map。key是MasterBaseTaskExecThread对象,value是Future<Boolean>。value其实是MasterBaseTaskExecThread线程的当前状态。 如果当前任务实例已经结束,则从Map中移除 如果当前任务实例成功,则put到completeTaskList且调用submitPostNode(task.getName()) 如果当前任务实例失败,则重试;否则直接结束(比如手动停止或暂停) 更新当前流程定义实例的状态,进入下一个循环
private void submitPostNode(String parentNodeName){List<TaskInstance> submitTaskList = null;if(parentNodeName == null){submitTaskList = getStartSubmitTaskList();}else{submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName);}// if previous node success , post node submitfor(TaskInstance task : submitTaskList){if(readyToSubmitTaskList.containsKey(task.getName())){continue;}if(completeTaskList.containsKey(task.getName())){logger.info("task {} has already run success", task.getName());continue;}if(task.getState().typeIsPause() || task.getState().typeIsCancel()){logger.info("task {} stopped, the state is {}", task.getName(), task.getState().toString());}else{addTaskToStandByList(task);}}}
/*** submit task to execute* @param taskInstance task instance* @return TaskInstance*/private TaskInstance submitTaskExec(TaskInstance taskInstance) {MasterBaseTaskExecThread abstractExecThread = null;if(taskInstance.isSubProcess()){abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);}else {abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);}Future<Boolean> future = taskExecService.submit(abstractExecThread);activeTaskNode.putIfAbsent(abstractExecThread, future);return abstractExecThread.getTaskInstance();}
public class MasterTaskExecThread extends MasterBaseTaskExecThread
/*** constructor of MasterBaseTaskExecThread* @param taskInstance task instance* @param processInstance process instance*/public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){this.processDao = BeanContext.getBean(ProcessDao.class);this.alertDao = BeanContext.getBean(AlertDao.class);this.processInstance = processInstance;this.taskQueue = TaskQueueFactory.getTaskQueueInstance();this.cancel = false;this.taskInstance = taskInstance;}
/*** submit task instance and wait complete* @return true is task quit is true*/@Overridepublic Boolean submitWaitComplete() {Boolean result = false;this.taskInstance = submit();if(!this.taskInstance.getState().typeIsFinished()) {result = waitTaskQuit();}taskInstance.setEndTime(new Date());processDao.updateTaskInstance(taskInstance);logger.info("task :{} id:{}, process id:{}, exec thread completed ",this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );return result;}
我们可以看出,每个任务实例都可以更新数据库,加上其他线程,对数据库的压力可能很大。如果任务非常多,并发非常大的情况下,jdbc连接线程池需要适当调大。否则,数据库会成为系统瓶颈。如果worker节点个数过多,这种压力又会几何倍数的增长。
protected TaskInstance submit(){Integer commitRetryTimes = conf.getInt(Constants.MASTER_COMMIT_RETRY_TIMES,Constants.defaultMasterCommitRetryTimes);Integer commitRetryInterval = conf.getInt(Constants.MASTER_COMMIT_RETRY_INTERVAL,Constants.defaultMasterCommitRetryInterval);int retryTimes = 1;while (retryTimes <= commitRetryTimes){try {TaskInstance task = processDao.submitTask(taskInstance, processInstance);if(task != null){return task;}logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes);Thread.sleep(commitRetryInterval);} catch (Exception e) {logger.error("task commit to mysql and queue failed : " + e.getMessage(),e);}retryTimes += 1;}return null;}
@Transactional(rollbackFor = Exception.class)public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){logger.info("start submit task : {}, instance id:{}, state: {}, ",taskInstance.getName(), processInstance.getId(), processInstance.getState() );processInstance = this.findProcessInstanceDetailById(processInstance.getId());//submit to mysqlTaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);if(task.isSubProcess() && !task.getState().typeIsFinished()){ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);}else if(!task.getState().typeIsFinished()){//submit to task queuetask.setProcessInstancePriority(processInstance.getProcessInstancePriority());submitTaskToQueue(task);}logger.info("submit task :{} state:{} complete, instance id:{} state: {} ",taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());return task;}
public Boolean submitTaskToQueue(TaskInstance taskInstance) {try{// task cannot submit when runningif(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName()));return true;}if(checkTaskExistsInTaskQueue(taskInstance)){logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", taskInstance.getName()));return true;}logger.info("task ready to queue: {}" , taskInstance);taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) );return true;}catch (Exception e){logger.error("submit task to queue Exception: ", e);logger.error("task queue error : %s", JSONUtils.toJson(taskInstance));return false;}}
protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();
public static ITaskQueue getTaskQueueInstance() {String queueImplValue = CommonUtils.getQueueImplValue();if (StringUtils.isNotBlank(queueImplValue)) {logger.info("task queue impl use zookeeper ");return TaskQueueZkImpl.getInstance();}else{logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");System.exit(-1);}return null;}
既然只支持zookeeper,这段冗余代码应该删除的。
/*** add task to tasks queue** @param key task queue name* @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...*/@Overridepublic void add(String key, String value) {try {String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));logger.info("add task : {} to tasks queue , result success",result);} catch (Exception e) {logger.error("add task to tasks queue exception",e);}}

通过taskInstance.id查询taskInstance。其实就是查询taskInstance的最新状态。 通过参数判断是否启用超时检查 一个while“死循环”。 while中判断任务是否执行结束,是则退出 获取任务实例、流程实例最新状态 休眠1秒,进入下一次while循环
Stopper.isRunning()作为一个全局变量,控制了N多的线程,每个线程都处于一个while“死循环”中。虽然都sleep一段时间,但感觉还是有点浪费。

private Runnable heartBeatThread(){Runnable heartBeatThread = new Runnable() {@Overridepublic void run() {if(Stopper.isRunning()) {// send heartbeat to zkif (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");return;}zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);}}};return heartBeatThread;}
public void heartBeatForZk(String znode, String serverType){try {//check dead or not in zookeeperif(zkClient.getState() == CuratorFrameworkState.STOPPED || checkIsDeadServer(znode, serverType)){stoppable.stop("i was judged to death, release resources and stop myself");return;}byte[] bytes = zkClient.getData().forPath(znode);String resInfoStr = new String(bytes);String[] splits = resInfoStr.split(Constants.COMMA);if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){return;}String str = splits[0] + Constants.COMMA+ splits[1] + Constants.COMMA+ OSUtils.cpuUsage() + Constants.COMMA+ OSUtils.memoryUsage() + Constants.COMMA+ OSUtils.loadAverage() + Constants.COMMA+ splits[5] + Constants.COMMA+ DateUtils.dateToString(new Date());zkClient.setData().forPath(znode,str.getBytes());} catch (Exception e) {logger.error("heartbeat for zk failed : " + e.getMessage(), e);stoppable.stop("heartbeat for zk exception, release resources and stop myself");}}
我们注意到zkMasterClient的类型是ZKMasterClient,那是不是还会有一个功能类似的ZKWorkerClient?也是用来汇报worker节点的系统资源信息的?
public synchronized void stop(String cause) {try {//execute only onceif(Stopper.isStoped()){return;}logger.info("master server is stopping ..., cause : {}", cause);// set stop signal is trueStopper.stop();try {//thread sleep 3 seconds for thread quitely stopThread.sleep(3000L);}catch (Exception e){logger.warn("thread sleep exception:" + e.getMessage(), e);}try {heartbeatMasterService.shutdownNow();}catch (Exception e){logger.warn("heartbeat service stopped exception");}logger.info("heartbeat service stopped");//close quartztry{QuartzExecutors.getInstance().shutdown();}catch (Exception e){logger.warn("Quartz service stopped exception:{}",e.getMessage());}logger.info("Quartz service stopped");try {ThreadPoolExecutors.getInstance().shutdown();}catch (Exception e){logger.warn("threadpool service stopped exception:{}",e.getMessage());}logger.info("threadpool service stopped");try {masterSchedulerService.shutdownNow();}catch (Exception e){logger.warn("master scheduler service stopped exception:{}",e.getMessage());}logger.info("master scheduler service stopped");try {zkMasterClient.close();}catch (Exception e){logger.warn("zookeeper service stopped exception:{}",e.getMessage());}logger.info("zookeeper service stopped");} catch (Exception e) {logger.error("master server stop exception : " + e.getMessage(), e);System.exit(-1);}}
Stopper.stop()。关闭全部线程的循环标志 休眠3秒 heartbeatMasterService.shutdownNow QuartzExecutors.getInstance().shutdown ThreadPoolExecutors.getInstance().shutdown masterSchedulerService.shutdownNow zkMasterClient.close
// start QuartzExecutors// what system should do if exceptiontry {ProcessScheduleJob.init(processDao);QuartzExecutors.getInstance().start();} catch (Exception e) {try {QuartzExecutors.getInstance().shutdown();} catch (SchedulerException e1) {logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);}logger.error("start Quartz failed : " + e.getMessage(), e);}/*** register hooks, which are called before the process exits*/Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {@Overridepublic void run() {if (zkMasterClient.getActiveMasterNum() <= 1) {for (int i = 0; i < Constants.DOLPHINSCHEDULER_WARN_TIMES_FAILOVER; i++) {zkMasterClient.getAlertDao().sendServerStopedAlert(1, OSUtils.getHost(), "Master-Server");}}stop("shutdownhook");}}));
public static void init(ProcessDao processDao) {ProcessScheduleJob.processDao = processDao;}
@Overridepublic void run() {while (Stopper.isRunning()){InterProcessMutex mutex = null;try {ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;//check memory and cpu usage and threadsboolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor);Thread.sleep(Constants.SLEEP_TIME_MILLIS);if(!runCheckFlag) {continue;}//whether have tasks, if no tasks , no need lock //get all tasksList<String> tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);if (CollectionUtils.isEmpty(tasksQueueList)){continue;}// creating distributed locks, lock path /dolphinscheduler/lock/workermutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),zkWorkerClient.getWorkerLockPath());// task instance id strList<String> taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);for(String taskQueueStr : taskQueueStrArr){if (StringUtils.isEmpty(taskQueueStr)) {continue;}if (!checkThreadCount(poolExecutor)) {break;}// get task instance idtaskInstId = getTaskInstanceId(taskQueueStr);// mainly to wait for the master insert task to succeedwaitForTaskInstance();taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstId);// verify task instance is nullif (verifyTaskInstanceIsNull(taskInstance)) {logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr);removeNodeFromTaskQueue(taskQueueStr);continue;}Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),taskInstance.getProcessDefine().getUserId());// verify tenant is nullif (verifyTenantIsNull(tenant)) {logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);removeNodeFromTaskQueue(taskQueueStr);continue;}// set queue for process instance, user-specified queue takes precedence over tenant queueString userQueue = processDao.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());logger.info("worker fetch taskId : {} from queue ", taskInstId);if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){continue;}// local execute pathString execLocalPath = getExecLocalPath();logger.info("task instance local execute path : {} ", execLocalPath);// init tasktaskInstance.init(OSUtils.getHost(),new Date(),execLocalPath);// check and create Linux usersFileUtils.createWorkDirAndUserIfAbsent(execLocalPath,tenant.getTenantCode(), logger);logger.info("task : {} ready to submit to task scheduler thread",taskInstId);// submit taskworkerExecService.submit(new TaskScheduleThread(taskInstance, processDao));// remove node from zkremoveNodeFromTaskQueue(taskQueueStr);}}catch (Exception e){logger.error("fetch task thread failure" ,e);}finally {AbstractZKClient.releaseMutex(mutex);}}}
真恶心的写法,居然是先获取runCheckFlag标志,休眠后再判断这个值。
难道不应该用hasTask这样的接口判断吗?此处只是判断是否有作业,获取全部的任务列表就不合适了,优点浪费内存。
其实可以基于zookeeper实现一个具有优先级的分布式锁,申请锁时会设置当前客户端的权重,权重大的抢到锁的可能性随之增大。
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2...
判断taskQueueStr是否为空。感觉有点多此一举。 判断当前线程数是否够用 从taskQueueStr中取到任务ID。就是按照_分隔之后的第四个字段。 等待任务实例信息插入到数据库。循环30次,每次等待1秒。注释说数据库操作会被延迟,不知道哪里会延迟。 通过任务id,获取任务实例信息。 通过任务实例,获取租户信息。 通过任务实例,获取用户队列信息。为啥不在查询任务实例信息的时候,直接获取到呢?或者在getTaskInstanceDetailByTaskId一次性获取到? 判断任务实例是否可以在当前节点执行,不能则继续下一个任务处理。这为啥不提前判断呢?调了2次db查询才来判断? 任务实例初始化 检查目录、用户是否存在。不存在则创建用户、目录。为啥不是提前建好?每次还要检查一遍。 提交任务,交给TaskScheduleThread线程执行。 删除taskQueue中对应的任务节点。
@Overridepublic void run() {try {// update task state is running according to task typeupdateTaskState(taskInstance.getTaskType());logger.info("script path : {}", taskInstance.getExecutePath());// task nodeTaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);// copy hdfs/minio file to localcopyHdfsToLocal(processDao,taskInstance.getExecutePath(),createProjectResFiles(taskNode),logger);// get process instance according to tak instanceProcessInstance processInstance = taskInstance.getProcessInstance();// set task propsTaskProps taskProps = new TaskProps(taskNode.getParams(),taskInstance.getExecutePath(),processInstance.getScheduleTime(),taskInstance.getName(),taskInstance.getTaskType(),taskInstance.getId(),CommonUtils.getSystemEnvPath(),processInstance.getTenantCode(),processInstance.getQueue(),taskInstance.getStartTime(),getGlobalParamsMap(),taskInstance.getDependency(),processInstance.getCmdTypeIfComplement());// set task timeoutsetTaskTimeout(taskProps, taskNode);taskProps.setTaskAppId(String.format("%s_%s_%s",taskInstance.getProcessDefine().getId(),taskInstance.getProcessInstance().getId(),taskInstance.getId()));// custom loggerLogger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefine().getId(),taskInstance.getProcessInstance().getId(),taskInstance.getId()));task = TaskManager.newTask(taskInstance.getTaskType(),taskProps,taskLogger);// task inittask.init();// task handletask.handle();// task result processtask.after();}catch (Exception e){logger.error("task scheduler failure", e);kill();// update task instance stateprocessDao.changeTaskState(ExecutionStatus.FAILURE,new Date(),taskInstance.getId());}logger.info("task instance id : {},task final status : {}",taskInstance.getId(),task.getExitStatus());// update task instance stateprocessDao.changeTaskState(task.getExitStatus(),new Date(),taskInstance.getId());}
还是一步步分析run方法。
更新任务状态为ExecutionStatus.RUNNING_EXEUTION 从任务实例获取任务节点信息。 从HDFS复制文件到本地。包括一些用户上传的资源文件,jar包、SQL文件、配置文件等等。 构造TaskProps对象。 初始化任务日志对象。 构造AbstractTask实例 依次调用AbstractTask的init、handle、after。 更新任务实例的状态。异常失败或成功等。
public static AbstractTask newTask(String taskType, TaskProps props, Logger logger)throws IllegalArgumentException {switch (EnumUtils.getEnum(TaskType.class,taskType)) {case SHELL:return new ShellTask(props, logger);case PROCEDURE:return new ProcedureTask(props, logger);case SQL:return new SqlTask(props, logger);case MR:return new MapReduceTask(props, logger);case SPARK:return new SparkTask(props, logger);case FLINK:return new FlinkTask(props, logger);case PYTHON:return new PythonTask(props, logger);case DEPENDENT:return new DependentTask(props, logger);case HTTP:return new HttpTask(props, logger);default:logger.error("unsupport task type: {}", taskType);throw new IllegalArgumentException("not support task type");}
public ShellTask(TaskProps taskProps, Logger logger) {super(taskProps, logger);this.taskDir = taskProps.getTaskDir();this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(),taskProps.getTaskAppId(),taskProps.getTaskInstId(),taskProps.getTenantCode(),taskProps.getEnvFile(),taskProps.getTaskStartTime(),taskProps.getTaskTimeout(),logger);this.processDao = SpringApplicationContext.getBean(ProcessDao.class);}
public void handle() throws Exception {try {// construct processexitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);} catch (Exception e) {logger.error("shell task failure", e);exitStatusCode = -1;}}
private void buildProcess(String commandFile) throws IOException {//init process builderProcessBuilder processBuilder = new ProcessBuilder();// setting up a working directoryprocessBuilder.directory(new File(taskDir));// merge error information to standard output streamprocessBuilder.redirectErrorStream(true);// setting up user to run commandsprocessBuilder.command("sudo", "-u", tenantCode, commandType(), commandFile);process = processBuilder.start();// print commandprintCommand(processBuilder);}
public void handle(){// set the name of the current threadString threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId());Thread.currentThread().setName(threadLoggerInfoName);try{TaskInstance taskInstance = null;while(Stopper.isRunning()){taskInstance = processDao.findTaskInstanceById(this.taskProps.getTaskInstId());if(taskInstance == null){exitStatusCode = -1;break;}if(taskInstance.getState() == ExecutionStatus.KILL){this.cancel = true;}if(this.cancel || allDependentTaskFinish()){break;}Thread.sleep(Constants.SLEEP_TIME_MILLIS);}if(cancel){exitStatusCode = Constants.EXIT_CODE_KILL;}else{DependResult result = getTaskDependResult();exitStatusCode = (result == DependResult.SUCCESS) ?Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;}}catch (Exception e){logger.error(e.getMessage(),e);exitStatusCode = -1;}}
通过任务实例id,获取当前最新的任务实例信息 判断状态是否为kill,是则退出 判断所有依赖任务是否完成,是则退出 休眠1秒,进入下一次循环。
private boolean allDependentTaskFinish(){boolean finish = true;for(DependentExecute dependentExecute : dependentTaskList){for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) {if(!dependResultMap.containsKey(entry.getKey())){dependResultMap.put(entry.getKey(), entry.getValue());//save depend result to loglogger.info("dependent item complete {} {},{}",DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString());}}if(!dependentExecute.finish(dependentDate)){finish = false;}}return finish;}
if(taskProps.getScheduleTime() != null){this.dependentDate = taskProps.getScheduleTime();}else{this.dependentDate = taskProps.getTaskStartTime();}
/*** calculate dependent result for one dependent item.* @param dependentItem dependent item* @param dateIntervals date intervals* @return dateIntervals*/private DependResult calculateResultForTasks(DependentItem dependentItem,List<DateInterval> dateIntervals) {DependResult result = DependResult.FAILED;for(DateInterval dateInterval : dateIntervals){ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),dateInterval);if(processInstance == null){logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}",dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() );return DependResult.FAILED;}if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){result = getDependResultByState(processInstance.getState());}else{TaskInstance taskInstance = null;List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());for(TaskInstance task : taskInstanceList){if(task.getName().equals(dependentItem.getDepTasks())){taskInstance = task;break;}}if(taskInstance == null){// cannot find task in the process instance// maybe because process instance is running or failed.result = getDependResultByState(processInstance.getState());}else{result = getDependResultByState(taskInstance.getState());}}if(result != DependResult.SUCCESS){break;}}return result;}
调用Stopper.stop设置全局变量。停止所有线程的“死”循环 休眠3秒 停止worker心跳。heartbeatWorkerService.shutdownNow 停止worker任务线程池。ThreadPoolExecutors.getInstance().shutdown 停止killExecutor线程池。killExecutorService.shutdownNow 停止fetchTask线程池。fetchTaskExecutorService.shutdownNow 停止zookeeper客户端。zkWorkerClient.close
private ZKWorkerClient(){init();}/*** init*/private void init(){// init system znodethis.initSystemZNode();// monitor workerthis.listenerWorker();// register workerthis.registWorker();}
protected void initSystemZNode(){try {createNodePath(getMasterZNodeParentPath());createNodePath(getWorkerZNodeParentPath());createNodePath(getDeadZNodeParentPath());} catch (Exception e) {logger.error("init system znode failed : " + e.getMessage(),e);}}private void createNodePath(String zNodeParentPath) throws Exception {if(null == zkClient.checkExists().forPath(zNodeParentPath)){zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);}}
public String registerServer(ZKNodeType zkNodeType) throws Exception {String registerPath = null;String host = OSUtils.getHost();if(checkZKNodeExists(host, zkNodeType)){logger.error("register failure , {} server already started on host : {}" ,zkNodeType.toString(), host);return registerPath;}registerPath = createZNodePath(zkNodeType);// handle dead serverhandleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);return registerPath;}
private void listenerWorker(){workerPathChildrenCache = new PathChildrenCache(zkClient, getZNodeParentPath(ZKNodeType.WORKER), true, defaultThreadFactory);try {workerPathChildrenCache.start();workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case CHILD_ADDED:logger.info("node added : {}" ,event.getData().getPath());break;case CHILD_REMOVED:String path = event.getData().getPath();//find myself deadString serverHost = getHostByEventDataPath(path);if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){return;}break;case CHILD_UPDATED:break;default:break;}}});}catch (Exception e){logger.error("monitor worker failed : " + e.getMessage(),e);}}
protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {if (serverHost.equals(OSUtils.getHost())) {logger.error("{} server({}) of myself dead , stopping...",zkNodeType.toString(), serverHost);stoppable.stop(String.format(" {} server {} of myself dead , stopping...",zkNodeType.toString(), serverHost));return true;}return false;}
zkWorkerClient.setStoppable(this);
private ZKMasterClient(ProcessDao processDao){this.processDao = processDao;init();}public void init(){// init daothis.initDao();InterProcessMutex mutex = null;try {// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/masterString znodeLock = getMasterStartUpLockPath();mutex = new InterProcessMutex(zkClient, znodeLock);mutex.acquire();// init system znodethis.initSystemZNode();// monitor masterthis.listenerMaster();// monitor workerthis.listenerWorker();// register masterthis.registerMaster();// check if fault tolerance is required,failure and toleranceif (getActiveMasterNum() == 1) {failoverWorker(null, true);failoverMaster(null);}}catch (Exception e){logger.error("master start up exception : " + e.getMessage(),e);}finally {releaseMutex(mutex);}}
initDao。其实就是初始化alertDao,调用DaoFactory.getDaoInstance(AlertDao.class)。好恶心的初始化方法,processDao是传进来的,alertDao又是这样创建的。 申请/dolphinscheduler/lock/failover/master路径的分布式锁。 申请到锁之后,依次调用initSystemZNode、listenerMaster、listenerWorker、registerMaster 如果当前活动的master个数为1则进行容灾。暂时还不知道为啥。
private void failoverMaster(String masterHost) {logger.info("start master failover ...");List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);//updateProcessInstance host is null and insert into commandfor(ProcessInstance processInstance : needFailoverProcessInstanceList){processDao.processNeedFailoverProcessInstances(processInstance);}logger.info("master failover end");}
@Transactional(rollbackFor = Exception.class)public void processNeedFailoverProcessInstances(ProcessInstance processInstance){//1 update processInstance host is nullprocessInstance.setHost("null");processInstanceMapper.updateById(processInstance);//2 insert into recover commandCommand cmd = new Command();cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));cmd.setExecutorId(processInstance.getExecutorId());cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);createCommand(cmd);}

Command command = new Command();command.setCommandType(CommandType.SCHEDULER);command.setExecutorId(schedule.getUserId());command.setFailureStrategy(schedule.getFailureStrategy());command.setProcessDefinitionId(schedule.getProcessDefinitionId());command.setScheduleTime(scheduledFireTime);command.setStartTime(fireTime);command.setWarningGroupId(schedule.getWarningGroupId());command.setWorkerGroupId(schedule.getWorkerGroupId());command.setWarningType(schedule.getWarningType());command.setProcessInstancePriority(schedule.getProcessInstancePriority());

这部分逻辑最好处理一下啊,要不然就真的永久处于running状态了。
/*** failover worker tasks** 1. kill yarn job if there are yarn jobs in tasks.* 2. change task state from running to need failover.* 3. failover all tasks when workerHost is null* @param workerHost worker host* @param needCheckWorkerAlive need check worker alive* @throws Exception exception*/private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {logger.info("start worker[{}] failover ...", workerHost);List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);for(TaskInstance taskInstance : needFailoverTaskInstanceList){if(needCheckWorkerAlive){if(!checkTaskInstanceNeedFailover(taskInstance)){continue;}}ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());if(instance!=null){taskInstance.setProcessInstance(instance);}// only kill yarn job if exists , the local thread has exitedProcessUtils.killYarnJob(taskInstance);taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);processDao.saveTaskInstance(taskInstance);}logger.info("end worker[{}] failover ...", workerHost);}

/*** determine if you can try again* @return can try result*/public boolean taskCanRetry() {if(this.isSubProcess()){return false;}if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){return true;}else {return (this.getState().typeIsFailure()&& this.getRetryTimes() < this.getMaxRetryTimes());}}
taskCanRetry中,如果是ExecutionStatus.NEED_FAULT_TOLERANCE状态,则不管重试了多少次,一定可以重试。有啥用呢?
if(task.getState().typeIsFailure()){if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){this.recoverToleranceFaultTaskList.add(task);}if(task.taskCanRetry()){addTaskToStandByList(task);}else{// node failure, based on failure strategyerrorTaskList.put(task.getName(), task);completeTaskList.put(task.getName(), task);if(processInstance.getFailureStrategy() == FailureStrategy.END){killTheOtherTasks();}}continue;}
worker如果与zookeeper连接超时,则停止心跳,停止获取任务,等待所有任务实例执行结束(正常或失败)并更新数据库状态 master如果与zookeeper连接超时,则停止心跳,停止获取流程定义实例,停止调度所有流程定义实例 master如果发现某个流程定义实例中的任务实例失败且属于ExecutionStatus.NEED_FAULT_TOLERANCE状态,则重新运行。
文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




