介绍
XXL-JOB任务调度平台分为2个部分,Scheduler和Executor。具体的实现Scheduler对应是xxl-job-admin,同时xxl-job-admin还配有web UI,可以配置管理任务。
Scheduler和Executor之间通过HTTP API交互,因此Executor可以通过各种语言实现。

以上图为例,scheduleThread将任务通过Executor的/run api推送给Executor
{"jobId": 3,"executorHandler": "task.test","executorParams": "x=100","executorBlockStrategy": "SERIAL_EXECUTION","executorTimeout": 0,"logId": 17,"logDateTime": 1606100913829,"glueType": "BEAN","glueSource": "","glueUpdatetime": 1606099566000,"broadcastIndex": 0,"broadcastTotal": 1}
Executor会根据executorHandler找到对应的handler,执行完之后,又会调用xxl-job-admin的/xxl-job-admin/api/callback回报任务的执行结果。从上面的描述我们可以知道,xxl-job-admin和excutor都必须暴露出api服务(都是HTTP接口)。
Scheduler可以有多个。它们之间通过MySQL进行同步。
主要的调度逻辑在JobScheduleHelper中
在每一轮执行调度逻辑之前, Scheduler必须先获得行锁
while (!scheduleThreadToStop) {...// 加行锁try {preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();...} catch (Exception e) { ... } finally {...// 注意:锁必须正常释放conn.commit();...}
由于xxl_job_lock 表中只有一条记录,所以这个逻辑与请求表锁类似,开销是比较大的。
其实这里还可以利用分治法的思想,让不同的任务对应到不同的行锁。来提高整体的并发度。依我推测, xxl-job 设计时考虑就是调度任务的数量不会太多。因此性能不是它的最主要关注点。
xxl-job内部没有使用Zookeeper这种数据库,因此在高可用性上与Quartz相比还是会稍微弱一些。好在它依赖少,搭建、学习的成本就会非常低。
对MySQL而言,如果xxl-job-admin在持有行锁的期间发生异常退出,与MySQL的连接断开。一段时间之后,MySQL会自动主动释放这个行锁。因此并不会出现死锁的问题。
接入指南
xxl-job-go-sdk
xxl-job go 客户端
支持
1.执行器注册2.耗时任务取消3.任务注册,像写http.Handler一样方便4.任务panic处理5.阻塞策略处理6.任务完成支持返回执行备注7.任务超时取消 (单位:秒,0为不限制)8.失败重试次数(在参数param中,目前由任务自行处理)9.可自定义日志10.自定义日志查看handler11.支持外部路由(可与gin集成)
How to get
go get github.com/mousycoder/xxl-job-go-sdk
Example
package mainimport ("fmt"xxl “github.com/mousycoder/xxl-job-go-sdk"“github.com/xxl-job-go-sdk/example/task""log")func main() {exec := xxl.NewExecutor(xxl.ServerAddr("http://127.0.0.1/xxl-job-admin"),xxl.AccessToken(""), //请求令牌(默认为空)xxl.ExecutorIp("127.0.0.1"), //可自动获取xxl.ExecutorPort("9999"), //默认9999(非必填)xxl.RegistryKey("golang-jobs"), //执行器名称xxl.SetLogger(&logger{}), //自定义日志)exec.Init()//设置日志查看handlerexec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{FromLineNum: req.FromLineNum,ToLineNum: 2,LogContent: "这个是自定义日志handler",IsEnd: true,}}})//注册任务handlerexec.RegTask("task.test", task.Test)exec.RegTask("task.test2", task.Test2)exec.RegTask("task.panic", task.Panic)log.Fatal(exec.Run())}//xxl.Logger接口实现type logger struct{}func (l *logger) Info(format string, a ...interface{}) {fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))}func (l *logger) Error(format string, a ...interface{}) {log.Println(fmt.Sprintf("自定义日志 - "+format, a...))}
xxl-job-admin配置
添加执行器
执行器管理->新增执行器,执行器列表如下:
AppName 名称 注册方式 OnLine 机器地址 操作golang-jobs golang执行器 自动注册 查看 ( 1 )
查看->注册节点
http://127.0.0.1:9999
添加任务
任务管理->新增(注意,使用BEAN模式,JobHandler与RegTask名称一致)
1 测试panic BEAN:task.panic * 0 * * * ? admin STOP2 测试耗时任务 BEAN:task.test2 * * * * * ? admin STOP3 测试golang BEAN:task.test * * * * * ? admin STOP
SDK 源码地址
https://github.com/mousycoder/xxl-job-go-sdk
SDK 源码解析
1.初始化执行器信息(令牌,执行器名称,IP,端口,调度中心地址)
exec := xxl.NewExecutor(xxl.ServerAddr("http://xxl-job.test.com:18088/xxl-job-admin"),xxl.AccessToken(“xxxxx"), //请求令牌(默认为空)//xxl.ExecutorIp("127.0.0.1"), 可自动获取xxl.ExecutorPort("9999"), //默认9999(非必填)xxl.RegistryKey("golang-jobs"), //执行器名称xxl.SetLogger(&logger{}), //自定义日志)
2.调用执行器注册接口(/api/registry)注册执行器到调度中心(20秒心跳防止过期)
func (e *executor) registry() {t := time.NewTimer(time.Second * 0) //初始立即执行defer t.Stop()req := &Registry{RegistryGroup: "EXECUTOR",RegistryKey: e.opts.RegistryKey,RegistryValue: "http://" + e.address,}param, err := json.Marshal(req)if err != nil {log.Fatal("执行器注册信息解析失败:" + err.Error())}for {<-t.Ct.Reset(time.Second * time.Duration(20)) //20秒心跳防止过期func() {result, err := e.post("/api/registry", string(param))if err != nil {e.log.Error("执行器注册失败1:" + err.Error())return}defer result.Body.Close()body, err := ioutil.ReadAll(result.Body)if err != nil {e.log.Error("执行器注册失败2:" + err.Error())return}res := &res{}_ = json.Unmarshal(body, &res)if res.Code != 200 {e.log.Error("执行器注册失败3:" + string(body))return}e.log.Info("执行器注册成功:" + string(body))}()}}
3.设置日志查看 handler,用于任务调度中心远程查看执行器日志
exec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{FromLineNum: req.FromLineNum,ToLineNum: 2,LogContent: "这个是自定义日志handler",IsEnd: true,}}})
4.注册任务 handler
exec.RegTask("task.test", task.Test)exec.RegTask("task.test2", task.Test2)exec.RegTask("task.panic", task.Panic)
加锁,放到内存中
func (t *taskList) Set(key string, val *Task) {t.mu.Lock()t.data[key] = valt.mu.Unlock()}
5.创建路由规则
// 创建路由器mux := http.NewServeMux()// 设置路由规则mux.HandleFunc("/run", e.runTask)mux.HandleFunc("/kill", e.killTask)mux.HandleFunc("/log", e.taskLog)
runTask
流程:
1.解析调用中心发送的请求参数
2.加入到 runlist
3.根据executorHandler的key值找到对应的任务
4.运行任务
5.回调调度中心
6.从 runlist 中删除
7.返回给调度中心 OK
killTask
流程:
1.解析调用中心发送的请求参数
2.判断是否在runlist里
3.中止任务
4.从 runlist 中删除任务
5.返回给调度中心 OK
taskLog
流程:
1.解析调用中心发送的请求参数
2.找到对应任务的 logHandler
3.返回给调度中心 log 信息
6.创建服务器,监听端口
server := &http.Server{Addr: e.address,WriteTimeout: time.Second * 3,Handler: mux,}// 监听端口并提供服务e.log.Info("Starting server at " + e.address)go server.ListenAndServe()
7.如果服务器意外退出,则移除注册信息
t := time.NewTimer(time.Second * 0) //初始立即执行defer t.Stop()req := &Registry{RegistryGroup: "EXECUTOR",RegistryKey: e.opts.RegistryKey,RegistryValue: "http://" + e.address,}param, err := json.Marshal(req)if err != nil {e.log.Error("执行器摘除失败:" + err.Error())}res, err := e.post("/api/registryRemove", string(param))if err != nil {e.log.Error("执行器摘除失败:" + err.Error())}body, err := ioutil.ReadAll(res.Body)e.log.Info("执行器摘除成功:" + string(body))_ = res.Body.Close()
调度中心/执行器 RESTful API
XXL-JOB 目标是一种跨平台、跨语言的任务调度规范和协议。
针对Java应用,可以直接通过官方提供的调度中心与执行器,方便快速的接入和使用调度中心。
针对非Java应用,可借助 XXL-JOB 的标准 RESTful API 方便的实现多语言支持。
调度中心 RESTful API:
说明:调度中心提供给执行器使用的API;不局限于官方执行器使用,第三方可使用该API来实现执行器;
API列表:执行器注册、任务结果回调等;
执行器 RESTful API :
说明:执行器提供给调度中心使用的API;官方执行器默认已实现,第三方执行器需要实现并对接提供给调度中心;
API列表:任务触发、任务终止、任务日志查询……等;
此处 RESTful API 主要用于非Java语言定制个性化执行器使用,实现跨语言。除此之外,如果有需要通过API操作调度中心,可以个性化扩展 “调度中心 RESTful API” 并使用。
调度中心 RESTful API
a、任务回调
说明:执行器执行完任务后,回调任务结果时使用------地址格式:{调度中心跟地址}/callbackHeader:XXL-JOB-ACCESS-TOKEN : {请求令牌}请求数据格式如下,放置在 RequestBody 中,JSON格式:[{"logId":1, // 本次调度日志ID"logDateTim":0, // 本次调度日志时间"executeResult":{"code": 200, // 200 表示任务执行正常,500表示失败"msg": null}}]响应数据格式:{"code": 200, // 200 表示正常、其他失败"msg": null // 错误提示消息}
b、执行器注册
说明:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度------地址格式:{调度中心跟地址}/registryHeader:XXL-JOB-ACCESS-TOKEN : {请求令牌}请求数据格式如下,放置在 RequestBody 中,JSON格式:{"registryGroup":"EXECUTOR", // 固定值"registryKey":"xxl-job-executor-example", // 执行器AppName"registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务地址}响应数据格式:{"code": 200, // 200 表示正常、其他失败"msg": null // 错误提示消息}
c、执行器注册摘除
说明:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行------地址格式:{调度中心跟地址}/registryRemoveHeader:XXL-JOB-ACCESS-TOKEN : {请求令牌}请求数据格式如下,放置在 RequestBody 中,JSON格式:{"registryGroup":"EXECUTOR", // 固定值"registryKey":"xxl-job-executor-example", // 执行器AppName"registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务跟地址}响应数据格式:{"code": 200, // 200 表示正常、其他失败"msg": null // 错误提示消息}
执行器 RESTful API
a、心跳检测
说明:调度中心检测执行器是否在线时使用------地址格式:{执行器内嵌服务跟地址}/beatHeader:XXL-JOB-ACCESS-TOKEN : {请求令牌}请求数据格式如下,放置在 RequestBody 中,JSON格式:响应数据格式:{"code": 200, // 200 表示正常、其他失败"msg": null // 错误提示消息}
b、忙碌检测
说明:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用------地址格式:{执行器内嵌服务跟地址}/idleBeatHeader:XXL-JOB-ACCESS-TOKEN : {请求令牌}请求数据格式如下,放置在 RequestBody 中,JSON格式:{"jobId":1 // 任务ID}响应数据格式:{"code": 200, // 200 表示正常、其他失败"msg": null // 错误提示消息}
c、触发任务
说明:触发任务执行------地址格式:{执行器内嵌服务跟地址}/runHeader:XXL-JOB-ACCESS-TOKEN : {请求令牌}请求数据格式如下,放置在 RequestBody 中,JSON格式:{"jobId":1, // 任务ID"executorHandler":"demoJobHandler", // 任务标识"executorParams":"demoJobHandler", // 任务参数"executorBlockStrategy":"COVER_EARLY", // 任务阻塞策略,可选值参考 com.xxl.job.core.enums.ExecutorBlockStrategyEnum"executorTimeout":0, // 任务超时时间,单位秒,大于零时生效"logId":1, // 本次调度日志ID"logDateTime":1586629003729, // 本次调度日志时间"glueType":"BEAN", // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum"glueSource":"xxx", // GLUE脚本代码"glueUpdatetime":1586629003727, // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新"broadcastIndex":0, // 分片参数:当前分片"broadcastTotal":0 // 分片参数:总分片}响应数据格式:{"code": 200, // 200 表示正常、其他失败"msg": null // 错误提示消息}
f、终止任务
说明:终止任务------地址格式:{执行器内嵌服务跟地址}/killHeader:XXL-JOB-ACCESS-TOKEN : {请求令牌}请求数据格式如下,放置在 RequestBody 中,JSON格式:{"jobId":1 // 任务ID}响应数据格式:{"code": 200, // 200 表示正常、其他失败"msg": null // 错误提示消息}
d、查看执行日志
说明:终止任务,滚动方式加载------地址格式:{执行器内嵌服务跟地址}/logHeader:XXL-JOB-ACCESS-TOKEN : {请求令牌}请求数据格式如下,放置在 RequestBody 中,JSON格式:{"logDateTim":0, // 本次调度日志时间"logId":0, // 本次调度日志ID"fromLineNum":0 // 日志开始行号,滚动加载日志}响应数据格式:{"code":200, // 200 表示正常、其他失败"msg": null // 错误提示消息"content":{"fromLineNum":0, // 本次请求,日志开始行数"toLineNum":100, // 本次请求,日志结束行号"logContent":"xxx", // 本次请求日志内容"isEnd":true // 日志是否全部加载完}}
最后的彩蛋:
XXL-JOB-PLUS: XXL-JOB的加强版
https://github.com/mousycoder/xxl-job-plus




