暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

xxl-job go语言接入指南

Java开发必备手册 2021-05-18
4075

介绍

XXL-JOB任务调度平台分为2个部分,Scheduler和Executor。具体的实现Scheduler对应是xxl-job-admin,同时xxl-job-admin还配有web UI,可以配置管理任务。

Scheduler和Executor之间通过HTTP API交互,因此Executor可以通过各种语言实现。


以上图为例,scheduleThread将任务通过Executor的/run api推送给Executor

  1. {

  2. "jobId": 3,

  3. "executorHandler": "task.test",

  4. "executorParams": "x=100",

  5. "executorBlockStrategy": "SERIAL_EXECUTION",

  6. "executorTimeout": 0,

  7. "logId": 17,

  8. "logDateTime": 1606100913829,

  9. "glueType": "BEAN",

  10. "glueSource": "",

  11. "glueUpdatetime": 1606099566000,

  12. "broadcastIndex": 0,

  13. "broadcastTotal": 1

  14. }

Executor会根据executorHandler找到对应的handler,执行完之后,又会调用xxl-job-admin的/xxl-job-admin/api/callback回报任务的执行结果。从上面的描述我们可以知道,xxl-job-admin和excutor都必须暴露出api服务(都是HTTP接口)。

Scheduler可以有多个。它们之间通过MySQL进行同步。

主要的调度逻辑在JobScheduleHelper中

在每一轮执行调度逻辑之前, Scheduler必须先获得行锁

  1. while (!scheduleThreadToStop) {

  2. ...

  3. // 加行锁

  4. try {

  5. preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );

  6. preparedStatement.execute();

  7. ...

  8. } catch (Exception e) { ... } finally {

  9. ...

  10. // 注意:锁必须正常释放

  11. conn.commit();

  12. ...

  13. }

由于xxl_job_lock 表中只有一条记录,所以这个逻辑与请求表锁类似,开销是比较大的。

其实这里还可以利用分治法的思想,让不同的任务对应到不同的行锁。来提高整体的并发度。依我推测, xxl-job 设计时考虑就是调度任务的数量不会太多。因此性能不是它的最主要关注点。

xxl-job内部没有使用Zookeeper这种数据库,因此在高可用性上与Quartz相比还是会稍微弱一些。好在它依赖少,搭建、学习的成本就会非常低。

对MySQL而言,如果xxl-job-admin在持有行锁的期间发生异常退出,与MySQL的连接断开。一段时间之后,MySQL会自动主动释放这个行锁。因此并不会出现死锁的问题。

接入指南

xxl-job-go-sdk

xxl-job go 客户端


支持

  1. 1.执行器注册

  2. 2.耗时任务取消

  3. 3.任务注册,像写http.Handler一样方便

  4. 4.任务panic处理

  5. 5.阻塞策略处理

  6. 6.任务完成支持返回执行备注

  7. 7.任务超时取消 (单位:秒,0为不限制)

  8. 8.失败重试次数(在参数param中,目前由任务自行处理)

  9. 9.可自定义日志

  10. 10.自定义日志查看handler

  11. 11.支持外部路由(可与gin集成)

How to get

 go get github.com/mousycoder/xxl-job-go-sdk 

Example

  1. package main

  2. import (

  3. "fmt"

  4. xxl “github.com/mousycoder/xxl-job-go-sdk"

  5. “github.com/xxl-job-go-sdk/example/task"

  6. "log"

  7. )

  8. func main() {

  9. exec := xxl.NewExecutor(

  10. xxl.ServerAddr("http://127.0.0.1/xxl-job-admin"),

  11. xxl.AccessToken(""), //请求令牌(默认为空)

  12. xxl.ExecutorIp("127.0.0.1"), //可自动获取

  13. xxl.ExecutorPort("9999"), //默认9999(非必填)

  14. xxl.RegistryKey("golang-jobs"), //执行器名称

  15. xxl.SetLogger(&logger{}), //自定义日志

  16. )

  17. exec.Init()

  18. //设置日志查看handler

  19. exec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {

  20. return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{

  21. FromLineNum: req.FromLineNum,

  22. ToLineNum: 2,

  23. LogContent: "这个是自定义日志handler",

  24. IsEnd: true,

  25. }}

  26. })

  27. //注册任务handler

  28. exec.RegTask("task.test", task.Test)

  29. exec.RegTask("task.test2", task.Test2)

  30. exec.RegTask("task.panic", task.Panic)

  31. log.Fatal(exec.Run())

  32. }

  33. //xxl.Logger接口实现

  34. type logger struct{}

  35. func (l *logger) Info(format string, a ...interface{}) {

  36. fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))

  37. }

  38. func (l *logger) Error(format string, a ...interface{}) {

  39. log.Println(fmt.Sprintf("自定义日志 - "+format, a...))

  40. }

xxl-job-admin配置

添加执行器

执行器管理->新增执行器,执行器列表如下:

  1. AppName 名称 注册方式 OnLine 机器地址 操作

  2. golang-jobs golang执行器 自动注册 查看 ( 1

查看->注册节点

  1. http://127.0.0.1:9999

添加任务

任务管理->新增(注意,使用BEAN模式,JobHandler与RegTask名称一致)

  1. 1 测试panic BEANtask.panic * 0 * * * ? admin STOP

  2. 2 测试耗时任务 BEANtask.test2 * * * * * ? admin STOP

  3. 3 测试golang BEANtask.test * * * * * ? admin STOP

SDK 源码地址

https://github.com/mousycoder/xxl-job-go-sdk

SDK 源码解析

1.初始化执行器信息(令牌,执行器名称,IP,端口,调度中心地址)

  1. exec := xxl.NewExecutor(

  2. xxl.ServerAddr("http://xxl-job.test.com:18088/xxl-job-admin"),

  3. xxl.AccessToken(“xxxxx"), //请求令牌(默认为空)

  4. //xxl.ExecutorIp("127.0.0.1"), 可自动获取

  5. xxl.ExecutorPort("9999"), //默认9999(非必填)

  6. xxl.RegistryKey("golang-jobs"), //执行器名称

  7. xxl.SetLogger(&logger{}), //自定义日志

  8. )

2.调用执行器注册接口(/api/registry)注册执行器到调度中心(20秒心跳防止过期)

  1. func (e *executor) registry() {

  2. t := time.NewTimer(time.Second * 0) //初始立即执行

  3. defer t.Stop()

  4. req := &Registry{

  5. RegistryGroup: "EXECUTOR",

  6. RegistryKey: e.opts.RegistryKey,

  7. RegistryValue: "http://" + e.address,

  8. }

  9. param, err := json.Marshal(req)

  10. if err != nil {

  11. log.Fatal("执行器注册信息解析失败:" + err.Error())

  12. }

  13. for {

  14. <-t.C

  15. t.Reset(time.Second * time.Duration(20)) //20秒心跳防止过期

  16. func() {

  17. result, err := e.post("/api/registry", string(param))

  18. if err != nil {

  19. e.log.Error("执行器注册失败1:" + err.Error())

  20. return

  21. }

  22. defer result.Body.Close()

  23. body, err := ioutil.ReadAll(result.Body)

  24. if err != nil {

  25. e.log.Error("执行器注册失败2:" + err.Error())

  26. return

  27. }

  28. res := &res{}

  29. _ = json.Unmarshal(body, &res)

  30. if res.Code != 200 {

  31. e.log.Error("执行器注册失败3:" + string(body))

  32. return

  33. }

  34. e.log.Info("执行器注册成功:" + string(body))

  35. }()

  36. }

  37. }

3.设置日志查看 handler,用于任务调度中心远程查看执行器日志

  1. exec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {

  2. return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{

  3. FromLineNum: req.FromLineNum,

  4. ToLineNum: 2,

  5. LogContent: "这个是自定义日志handler",

  6. IsEnd: true,

  7. }}

  8. })

4.注册任务 handler

  1. exec.RegTask("task.test", task.Test)

  2. exec.RegTask("task.test2", task.Test2)

  3. exec.RegTask("task.panic", task.Panic)

加锁,放到内存中

  1. func (t *taskList) Set(key string, val *Task) {

  2. t.mu.Lock()

  3. t.data[key] = val

  4. t.mu.Unlock()

  5. }

5.创建路由规则

  1. // 创建路由器

  2. mux := http.NewServeMux()

  3. // 设置路由规则

  4. mux.HandleFunc("/run", e.runTask)

  5. mux.HandleFunc("/kill", e.killTask)

  6. mux.HandleFunc("/log", e.taskLog)

  • runTask

    流程:

    1.解析调用中心发送的请求参数

    2.加入到 runlist

    3.根据executorHandler的key值找到对应的任务

    4.运行任务

    5.回调调度中心

    6.从 runlist 中删除

    7.返回给调度中心 OK

  • killTask

    流程:

    1.解析调用中心发送的请求参数

    2.判断是否在runlist里

    3.中止任务

    4.从 runlist 中删除任务

    5.返回给调度中心 OK

  • taskLog

    流程:

    1.解析调用中心发送的请求参数

    2.找到对应任务的 logHandler

    3.返回给调度中心 log 信息

6.创建服务器,监听端口

  1. server := &http.Server{

  2. Addr: e.address,

  3. WriteTimeout: time.Second * 3,

  4. Handler: mux,

  5. }

  6. // 监听端口并提供服务

  7. e.log.Info("Starting server at " + e.address)

  8. go server.ListenAndServe()

7.如果服务器意外退出,则移除注册信息

  1. t := time.NewTimer(time.Second * 0) //初始立即执行

  2. defer t.Stop()

  3. req := &Registry{

  4. RegistryGroup: "EXECUTOR",

  5. RegistryKey: e.opts.RegistryKey,

  6. RegistryValue: "http://" + e.address,

  7. }

  8. param, err := json.Marshal(req)

  9. if err != nil {

  10. e.log.Error("执行器摘除失败:" + err.Error())

  11. }

  12. res, err := e.post("/api/registryRemove", string(param))

  13. if err != nil {

  14. e.log.Error("执行器摘除失败:" + err.Error())

  15. }

  16. body, err := ioutil.ReadAll(res.Body)

  17. e.log.Info("执行器摘除成功:" + string(body))

  18. _ = res.Body.Close()

调度中心/执行器 RESTful API

XXL-JOB 目标是一种跨平台、跨语言的任务调度规范和协议。

针对Java应用,可以直接通过官方提供的调度中心与执行器,方便快速的接入和使用调度中心。

针对非Java应用,可借助 XXL-JOB 的标准 RESTful API 方便的实现多语言支持。

  • 调度中心 RESTful API:


    • 说明:调度中心提供给执行器使用的API;不局限于官方执行器使用,第三方可使用该API来实现执行器;

    • API列表:执行器注册、任务结果回调等;

  • 执行器 RESTful API :


    • 说明:执行器提供给调度中心使用的API;官方执行器默认已实现,第三方执行器需要实现并对接提供给调度中心;

    • API列表:任务触发、任务终止、任务日志查询……等;

此处 RESTful API 主要用于非Java语言定制个性化执行器使用,实现跨语言。除此之外,如果有需要通过API操作调度中心,可以个性化扩展 “调度中心 RESTful API” 并使用。

调度中心 RESTful API

a、任务回调

  1. 说明:执行器执行完任务后,回调任务结果时使用

  2. ------

  3. 地址格式:{调度中心跟地址}/callback

  4. Header

  5. XXL-JOB-ACCESS-TOKEN : {请求令牌}

  6. 请求数据格式如下,放置在 RequestBody 中,JSON格式:

  7. [{

  8. "logId":1, // 本次调度日志ID

  9. "logDateTim":0, // 本次调度日志时间

  10. "executeResult":{

  11. "code": 200, // 200 表示任务执行正常,500表示失败

  12. "msg": null

  13. }

  14. }]

  15. 响应数据格式:

  16. {

  17. "code": 200, // 200 表示正常、其他失败

  18. "msg": null // 错误提示消息

  19. }

b、执行器注册

  1. 说明:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度

  2. ------

  3. 地址格式:{调度中心跟地址}/registry

  4. Header

  5. XXL-JOB-ACCESS-TOKEN : {请求令牌}

  6. 请求数据格式如下,放置在 RequestBody 中,JSON格式:

  7. {

  8. "registryGroup":"EXECUTOR", // 固定值

  9. "registryKey":"xxl-job-executor-example", // 执行器AppName

  10. "registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务地址

  11. }

  12. 响应数据格式:

  13. {

  14. "code": 200, // 200 表示正常、其他失败

  15. "msg": null // 错误提示消息

  16. }

c、执行器注册摘除

  1. 说明:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行

  2. ------

  3. 地址格式:{调度中心跟地址}/registryRemove

  4. Header

  5. XXL-JOB-ACCESS-TOKEN : {请求令牌}

  6. 请求数据格式如下,放置在 RequestBody 中,JSON格式:

  7. {

  8. "registryGroup":"EXECUTOR", // 固定值

  9. "registryKey":"xxl-job-executor-example", // 执行器AppName

  10. "registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务跟地址

  11. }

  12. 响应数据格式:

  13. {

  14. "code": 200, // 200 表示正常、其他失败

  15. "msg": null // 错误提示消息

  16. }

执行器 RESTful API

a、心跳检测

  1. 说明:调度中心检测执行器是否在线时使用

  2. ------

  3. 地址格式:{执行器内嵌服务跟地址}/beat

  4. Header

  5. XXL-JOB-ACCESS-TOKEN : {请求令牌}

  6. 请求数据格式如下,放置在 RequestBody 中,JSON格式:

  7. 响应数据格式:

  8. {

  9. "code": 200, // 200 表示正常、其他失败

  10. "msg": null // 错误提示消息

  11. }

b、忙碌检测

  1. 说明:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用

  2. ------

  3. 地址格式:{执行器内嵌服务跟地址}/idleBeat

  4. Header

  5. XXL-JOB-ACCESS-TOKEN : {请求令牌}

  6. 请求数据格式如下,放置在 RequestBody 中,JSON格式:

  7. {

  8. "jobId":1 // 任务ID

  9. }

  10. 响应数据格式:

  11. {

  12. "code": 200, // 200 表示正常、其他失败

  13. "msg": null // 错误提示消息

  14. }

c、触发任务

  1. 说明:触发任务执行

  2. ------

  3. 地址格式:{执行器内嵌服务跟地址}/run

  4. Header

  5. XXL-JOB-ACCESS-TOKEN : {请求令牌}

  6. 请求数据格式如下,放置在 RequestBody 中,JSON格式:

  7. {

  8. "jobId":1, // 任务ID

  9. "executorHandler":"demoJobHandler", // 任务标识

  10. "executorParams":"demoJobHandler", // 任务参数

  11. "executorBlockStrategy":"COVER_EARLY", // 任务阻塞策略,可选值参考 com.xxl.job.core.enums.ExecutorBlockStrategyEnum

  12. "executorTimeout":0, // 任务超时时间,单位秒,大于零时生效

  13. "logId":1, // 本次调度日志ID

  14. "logDateTime":1586629003729, // 本次调度日志时间

  15. "glueType":"BEAN", // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum

  16. "glueSource":"xxx", // GLUE脚本代码

  17. "glueUpdatetime":1586629003727, // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新

  18. "broadcastIndex":0, // 分片参数:当前分片

  19. "broadcastTotal":0 // 分片参数:总分片

  20. }

  21. 响应数据格式:

  22. {

  23. "code": 200, // 200 表示正常、其他失败

  24. "msg": null // 错误提示消息

  25. }

f、终止任务

  1. 说明:终止任务

  2. ------

  3. 地址格式:{执行器内嵌服务跟地址}/kill

  4. Header

  5. XXL-JOB-ACCESS-TOKEN : {请求令牌}

  6. 请求数据格式如下,放置在 RequestBody 中,JSON格式:

  7. {

  8. "jobId":1 // 任务ID

  9. }

  10. 响应数据格式:

  11. {

  12. "code": 200, // 200 表示正常、其他失败

  13. "msg": null // 错误提示消息

  14. }

d、查看执行日志

  1. 说明:终止任务,滚动方式加载

  2. ------

  3. 地址格式:{执行器内嵌服务跟地址}/log

  4. Header

  5. XXL-JOB-ACCESS-TOKEN : {请求令牌}

  6. 请求数据格式如下,放置在 RequestBody 中,JSON格式:

  7. {

  8. "logDateTim":0, // 本次调度日志时间

  9. "logId":0, // 本次调度日志ID

  10. "fromLineNum":0 // 日志开始行号,滚动加载日志

  11. }

  12. 响应数据格式:

  13. {

  14. "code":200, // 200 表示正常、其他失败

  15. "msg": null // 错误提示消息

  16. "content":{

  17. "fromLineNum":0, // 本次请求,日志开始行数

  18. "toLineNum":100, // 本次请求,日志结束行号

  19. "logContent":"xxx", // 本次请求日志内容

  20. "isEnd":true // 日志是否全部加载完

  21. }

  22. }

最后的彩蛋:

XXL-JOB-PLUS: XXL-JOB的加强版

https://github.com/mousycoder/xxl-job-plus


文章转载自Java开发必备手册,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论