
点击蓝字,关注我们
1
Apache DolphinScheduler介绍
2
Dag背景知识
A graph is formed by vertices and by edges connecting pairs of vertices, where the
vertices can be any kind of object that is connected in pairs by edges.
In the case of a directed graph, each edge has an orientation, from one vertex to another vertex. A path in a directed graph is a sequence of edges having the property that the ending vertex of each
edge in the sequence is the same as the starting vertex of the next edge in the sequence; a path forms a cycle if the starting vertex of its first edge equals the ending vertex of its last edge.
A directed acyclic graph is a directed graph that has no cycles.[1][2][3]
A vertex v of a directed graph is said to be reachable from another
vertex u when there exists a path that starts at u and ends at v.
As a special case, every vertex is considered to be reachable from itself (by a path with zero edges). If a vertex can reach itself via a nontrivial path (a path with one or more edges), then that path is a cycle, so another way to define directed acyclic graphs is that they are the graphs in which no vertex can reach itself via a nontrivial path.
vertex
一个实体或者元素,可以是任何抽象的objectedge
一条有方向直线,包含两个vertex,分别扮演起点和终点
Dag约束
在Dag中,一个edge(a,b)的终点可以作为另一个edge(b,c)的起点,这个链路中所有的vertex都是可到达的, c是从a可达的。 在Dag中允许vertex不存在于任何一个edge中,这个节点可以从自己到达自己(一个孤岛,不和其他vertex有任何联系) 如果一个vertex可以从自己到达自己,但是中间经过了其他的vertex,那么这就存在一个环circle 在Dag中没有环
public class DAG<Node, NodeInfo, EdgeInfo> {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
**
* node map, key is node, value is node information
*/
private final Map<Node, NodeInfo> nodesMap;
**
* edge map. key is node of origin;value is Map with key for destination node and value for edge
*/
private final Map<Node, Map<Node, EdgeInfo>> edgesMap;
**
* reversed edge set,key is node of destination, value is Map with key for origin node and value for edge
*/
private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
}
Node表示任务的id NodeInfo表示任务的详细信息 EdgeInfo包含任务id和依赖任务id
3
数仓建设任务和任务依赖


4
DolphinScheduler系统角色拆分
5
DolphinScheduler任务调度流程


Command分发流程
处理方式
生产者
一个启动工作流实例的command样例
{
"commandType": "START_PROCESS",
"processDefinitionCode": 14285512555584,
"executorId": 1,
"commandParam": "{}",
"taskDependType": "TASK_POST",
"failureStrategy": "CONTINUE",
"warningType": "NONE",
"startTime": 1723444881372,
"processInstancePriority": "MEDIUM",
"updateTime": 1723444881372,
"workerGroup": "default",
"tenantCode": "default",
"environmentCode": -1,
"dryRun": 0,
"processInstanceId": 0,
"processDefinitionVersion": 1,
"testFlag": 0
}
消费者
查询语句
<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select *
from t_ds_command
where id % #{masterCount} = #{thisMasterSlot}
order by process_instance_priority, id asc
limit #{limit}
</select>
MasterSchedulerBootstrap loop轮训查到待处理的command任务,将command任务和master host生成
ProcessInstance,将
ProcessInstance对象插入到
t_ds_process_instance表中,
同时生成包含运行所需要的上下文信息的可执行任务
workflowExecuteRunnable。
将
workflowExecuteRunnablecache到本地
cache processInstanceExecCacheManager,同时生产将
ProcessInstance的
WorkflowEventType.START_WORKFLOW生产到
workflowEventQueue队列中。
Dag遍历执行任务
Master本地cache缓冲
ProcessInstanceExecCacheManagerImpl,提供如下核心功能
public interface ProcessInstanceExecCacheManager {
**
* get WorkflowExecuteThread by process instance id
*
* @param processInstanceId processInstanceId
* @return WorkflowExecuteThread
*/
WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);
**
* judge the process instance does it exist
*
* @param processInstanceId processInstanceId
* @return true - if process instance id exists in cache
*/
boolean contains(int processInstanceId);
**
* remove cache by process instance id
*
* @param processInstanceId processInstanceId
*/
void removeByProcessInstanceId(int processInstanceId);
**
* cache
*
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);
**
* get all WorkflowExecuteThread from cache
*
* @return all WorkflowExecuteThread in cache
*/
Collection<WorkflowExecuteRunnable> getAll();
void clearCache();
}
生产者
MasterSchedulerBootstrap loop将command transform to可以运行的任务,任务对象中包含了要处理的所有上下文信息
消费者
EventExecuteService根据dag信息,拿到第一批没有任何依赖的
TaskInstance添加到待执行任务队列
standByTaskInstancePriorityQueue中,
standByTaskInstancePriorityQueue按照优先级先后顺序执行,处理任务状态,将待执行任务提交到
globalTaskDispatchWaitingQueue队列中。
可执行任务Dispatch
Master进城内优先级队列
globalTaskDispatchWaitingQueue中,已经是可执行任务的最小单元了
生产者
EventExecuteService根据
parent node,对Dag进行广度优先遍历,提交任务到
globalTaskDispatchWaitingQueue队列中。
消费者
GlobalTaskDispatchWaitingQueueLooper,
GlobalTaskDispatchWaitingQueueLooper消费待dispatch的任务,根据任务类型执行任务调度,对任务的调度是走的rpc接口,目前来看根据任务类型分为两种:
MasterTaskDispatcher WorkerTaskDispatcher
WorkerTaskDispatcher来说,
rpc server收到
rpc request之后提交任务到了
workerTaskExecutorThreadPool执行。所以这是一个异步处理任务的过程,不至于让
master serverhang在这个地方。对于任务的执行进度,会在关键节点进行回调通知。
任务执行状态回调通知
生产者
TaskExecutionStatus.FAILURE 执行抛出异常,运行失败 TaskExecutionStatus.RUNNING_EXECUTION 开始执行 TaskExecutionStatus.KILL 被杀死 TaskExecutionStatus.SUCCESS 执行成功

消费者
ITaskInstanceExecutionEventListener服务,服务接受rpc请求,并将任务添加到
TaskEventService eventQueue队列中。
任务状态处理
缓冲队列
TaskEventService eventQueue队列。
生产者
api-server用户行为 master节点任务调度 work节点任务执行 master任务执行
消费者
TaskInstanceListenerImpl服务,
TaskInstanceListenerImpl将
TaskEventtransform to
TaskExecuteRunnable,并且提交到线程池执行
taskExecuteThreadMap待执行,在线程池中修改任务的执行状态。

我知道你在看哟
文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




