分布式任务调度
1、什么是分布式任务调度?
任务调度是指基于给定的时间点,给定的时间间隔或者给定执行次数自动的执行任务。任务调度是是操作系统的重要组成部分,而对于实时的操作系统,任务调度直接影响着操作系统的实时性能。任务调度涉及到多线程并发、运行时间规则定制及解析、线程池的维护等诸多方面的工作。
三个关键词:分布式、任务调度、配置中心。
分布式:平台是分布式部署的,各个节点之间可以无状态和无限的水平扩展,分布式任务调度就是要解决在多个节点上只执行一次还是都执行的问题。 任务调度:涉及到任务状态管理、任务调度请求的发送与接收、具体任务的分配、任务的具体执行;(这里又会遇到一共要处理哪些任务、任务要分配到哪些机器上处理、任务分发的时候判断哪些机器可以用等问题,所以又需要一个可以感知整个集群运行状态的配置中心) 配置中心:可以感知整个集群的状态、任务信息的注册
一个分布式任务调度系统需要以下内容:
web模块、server模块、scheduler模块、worker模块、注册中心。
1.Web模块:用来提供任务的信息,控制任务的状态、信息展示等。2.Server模块:负责接收web端传来的任务执行的信息,下发任务调度请求给Scheduler,会去注册中心进行注册3.Scheduler模块:接收server端传来的调度请求,将任务进行更加细化的拆分然后下发,到注册中心进行注册,获取到可以干活的worker。4.Worker模块:负责具体的任务执行。5.注册中心。
2、任务调度和消息的差异
2.1可互换场景
通知类:电商发货成功发短信通知客户的业务场景,我们可以在发货成功后发送MQ消息到队列,然后去消费mq消息,发送短信。
2.2非互换场景
•时间驱动/事件驱动:内部系统一般可以通过时间来驱动,但涉及到外部系统,则只能使用时间驱动。如怕取外部网站价格,每小时爬一次。•批量处理/逐条处理:批量处理堆积的数据更加高效,在不需要实时性的情况下比消息中间件更有优势。而且有的业务逻辑只能批量处理。如移动每个月结算我们的话费。•实时性/非实时性:消息中间件能够做到实时处理数据,但是有些情况下并不需要实时,比如:vip升级。•系统内部/系统解耦:定时任务调度一般是在系统内部,而消息中间件可用于两个系统间。
3、单机实现
•Timer:是一个定时器类,通过该类可以为指定的定时任务进行配置。TimerTask类是一个定时任务类,该类实现了Runnable接口,缺点异常未检查会中止线程•Thread:配置一个永远执行的进程实现定时任务•ScheduledExecutorService:相对延迟或者周期作为定时任务调度,缺点没有绝对的日期或者时间•Spring定时框架(@Schedule):配置简单功能较多,如果系统使用单机的话可以优先考虑Spring定时器
4、分步是任务调度考虑的问题
•简单-对开发者接入简单,对使用者使用简单。•丰富的文档,有很多开源的项目文档少之又少,当然还有一些开源项目只有英文文档,如果你英文不是很行,那可能需要考虑中文居多的文档。•有管理界面,很方便执行操作和统计数据。•支持主流框架:比如Spring,Springboot等,当然这个至少要支持你们业务中的主流框架。•框架轻量级,方便根据自己的需求进行定制化。•高性能,高可靠,高可用:不能让框架成为业务中的瓶颈。•代码更新频率和社区使用情况:使用的公司越多证明其越受更多人的喜爱,代码更新频率越高证明出现问题就会越少,最好是由大厂开源并且维护。•多语言需求:如果在你们业务中有多语言需求,比如你们公司用的开发语言很多,都需要调度框架那么你需要使用多语言支持。比如Rpc支持多语言的代表就是Thrift。•能否解决当前的痛点:这个是最重要的,如果连你问题都解决不了那使用这个还有什么意义呢?
5、开源分布式任务调度框架对比
| 对比内容 | cronsun | xxl-job | Elastic-job | saturn | lts | TBSchedule |
| 项目背景 | 替代crontab | 大众点评、文档齐全 | 当当网开源、文档齐全 | 唯品会自主研发,基于当当的Elastic-job版本1 | -- | 阿里早期开源、代码略陈旧 |
| 语言 | Go | Java | Java | Java | Java | Java |
| 依赖 | etcd、mongodb | mysql ,jdk1.7+ , maven3.0+ | jdk1.7+, zookeeper 3.4.6+ ,maven3.0.4+ ,mesos,mysql | JDK 7 or JDK 8 Maven 3.0.4+ node.js 8.7.0+ npm 5.4.2+ docker (版本不限) | CentOs maven java MySQL | |
| 多平台接入 | 大众点评、优信二手车等70个以上的公司接入 | |||||
| 易用、易部署 | 安装简单、使用简便 | 支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手 | 易用易部署 | 可以很好的部署到docker容器上 | 易用,难部署 | |
| web界面统一管理任务 | cronweb界面可以管理任务 | xxl-job-admin提供了web界面,同时也是作为任务调度中心 | 提供运维界面elastic-job-lite-console,可以管理作业和注册中心 | Saturn Console | lts-admin | |
| 多节点部署时任务不能重复执行 | 支持任务单次执行 | “调度中心”通过DB锁保证集群分布式调度的一致性, 一次任务调度只会触发一次执行 | 容错处理:作业服务器与Zookeeper服务器通信失败则立即停止作业运行,防止作业注册中心将失效的分片分项配给其他作业服务器,而当前作业服务器仍在执行任务,导致重复执行 幂等性:重复作业任务项判定,不重复执行已运行的作业任务项 | 容错处理 | 支持 | |
| 日志可追溯 | 有日志查询界面,并已支持日志清除 | 支持在线查看调度结果,并且支持以Rolling方式实时查看执行器输出的完整的执行日志(数据库) | 支持,有日志查询界面,job的运行日志将会记录到数据库里 | saturn-executor-log.log(executor日志)saturn-executor.log(job日志) | 在TaskTracker端提供了业务日志记录器 | |
| 弹性扩容缩容 | 一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务 | 运行中的作业服务器崩溃,或新增加n台作业服务器,作业框架将在下次作业执行前重新分片,不影响当前作业执行 | 能支持容器化技术进行自动executor扩容和减容,保证高峰期处理能力的弹性伸缩。 | 因为LTS各个节点都是无状态的,所以支持动态增加删除节点,达到负载均衡的目的 | (2) TBSchedule的宿主服务器可以进行动态的扩容和资源回收 | |
| 报警 | 邮件提醒,由cronweb界面发出,而不是cronnode | 任务失败邮件报警,支持配置多邮箱地址,用逗号隔开 | 可通过事件订阅自行实现 | Saturn提供了完善的监控告警机制,能方便开发者进行监控 | ||
| 阻塞处理策略 | 多机单任务(防止单机挂掉任务不按时执行) | 策略包括:单机串行、丢弃后续调度、覆盖之前的调度 | zk的session timeout时间,超过这个时间临时节点将会被清除,作业才会重新分片 | 设置了timeout的作业开始执行后会启动超时检测,如果执行超时,则会停止当前作业的执行 | ||
| 支持并行调度 | 多节点 | 调度系统多线程触发调度运行,确保调度精确执行,不被阻塞 | 将一个任务分为多个小任务在多台服务器上执行 | 分片 | 多节点 | 支持 |
| 失败处理策略 | 任务失败重试 | 失败告警、失败重试 | 失败转移、被错过的作业重触发 | 支持作业HA,负载均衡和失败转移 | 当节点组中的一个节点当机之后,自动转到其他节点工作。当整个节点组当机之后,将会采用存储文件的方式,待节点组可用的时候进行重发。当执行任务的TaskTracker节点当机之后,JobTracker 会将这个TaskTracker上的未完成的任务(死任务),重新分配给节点组中其他节点执行。 | 失效转移 |
| 高可用 | 当某个节点死机的时候可以自动调度到正常的节点执行 | 如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求 | 调度器的高可用是通过运行几个指向同一个ZooKeeper集群的Elastic-Job-Cloud-Scheduler实例来实现的。ZooKeeper用于在当前主Elastic-Job-Cloud-Scheduler实例失败的情况下执行领导者选举。通过至少两个调度器实例来构成集群,集群中只有一个调度器实例提供服务,其他实例处于”待命”状态。当该实例失败时,集群会选举剩余实例中的一个来继续提供服务。 | Saturn是面向任务的,能够监控到executor的状态,在executor下线或者上线的时,均会对任务分片进行重分配,保证其可用性。 | 由于上面的失败处理策略,实现了任务的高可用 | 支持 |
| 路由策略 | 故障转移 | 第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等 | 失效转移 | 失败转移、负载均衡 | 故障转移 | |
| 动态分片 | 分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。 | 默认包含三种分片策略:基于平均分配算法的分片策略、 作业名的哈希值奇偶数决定IP升降序算法的分片策略、根据作业名的哈希值对Job实例列表进行轮转的分片策略,支持自定义分片策略 | 由用户在UI界面输入 | TBSchedule的分布式机制是通过灵活的Sharding方式实现的,比如可以按所有数据的ID按10取模分片、按月份分片等,根据不同的场景由客户端配置分片规则。 | ||
| 任务类型 | 普通任务、单机单进程任务、一个任务执行间隔内允许执行一次 | 提供Web IDE,支持在线开发任务逻辑代码,动态发布,实时编译生效,省略部署上线的过程。支持30个版本的历史版本回溯。支持以GLUE模式开发和运行脚本任务,包括Shell、Python、NodeJS等类型脚本 | 支持OneOff,Perpetual和SequencePerpetual三种作业模式 | 提供除java作业的基础上增加了shell作业的支持。主要可以支持类似PHP, Python等脚本作业 | 实时任务:提交了之后立即就要执行的任务。定时任务:在指定时间点执行的任务,譬如 今天3点执行(单次)。Cron任务:CronExpression,和quartz类似(但是不是使用quartz实现的)譬如 0 0/1 * ? Repeat任务:譬如每隔5分钟执行一次,重复50次就停止。 | |
| 任务在业务中的状态流转 | 所有的任务都会存储在一个分布式etcd里,单个crond部署成一个服务,也就是图中所示的node.1、node.2、node.n等,然后再由web界面去管理 | 将调度的行为抽象成“调度中心”公共平台,“调度中心”负责发起调度请求,将任务抽象成JobHandler,交由执行器管理,执行器负责接收调度请求并执行相应的JobHandler中的业务逻辑 | 首先配置任务的相关信息,然后任务去注册中心进行注册,注册中心完成主节点选举和分片策略的设置。Quarzt job一旦触发,则任务执行。任务执行后,处理错过的任务以及监听事件 | 将作业在逻辑上划分为若干个分片,通过作业分片调度器将作业分片指派给特定的执行节点。执行节点通过quartz触发执行作业的具体实现,在执行的时候,会将分片序号和参数作为参数传入。作业的实现逻辑需分析分片序号和分片参数,并以此为依据来调用具体的实现 | 任务到注册中心进行注册,JobClient将任务进行提交,JobTracker接收并分配任务,TaskTracker执行任务,并将结果反馈给客户端 | |
| 任务进度监控 | 在cronweb添加完成任务之后,在任务标签页就能看到所有添加的定时任务以及执行情况 | 支持实时监控任务进度 | 监控作业运行时状态,统计最近一段时间处理的数据成功和失败数量,记录作业上次运行开始时间,结束时间和下次运行时间。 | Saturn通过统一的控制台进行全部域及全部作业的配置,执行情况监控,结点监控等 | 这些业务日志可以通过任务ID串联起来,可以在LTS-Admin中实时查看任务的执行进度 | schedule管理控制台。负责控制、监控任务执行状态。 |
| 自定义任务参数 | 在cronweb添加定时任务时,可以指定任务执行脚本和参数 | 支持在线配置调度任务入参,即时生效 | 在修改任务时,可以自定义参数 | 可以在界面中配置任务的参数 | ||
| 安全设置 | 支持security.json安全设置 | 调度中心向执行器发送的调度请求时使用RequestModel和ResponseModel两个对象封装调度请求参数和响应数据, 在进行通讯之前底层会将上述两个对象对象序列化,并进行数据协议以及时间戳检验,从而达到数据加密的功能 | ||||
| 开源地址 | http://github.com/shunfei/cronsun | https://github.com/xuxueli/xxl-job | https://github.com/elasticjob/elastic-job-lite | https://github.com/vipshop/Saturn | https://github.com/ltsopensource/light-task-scheduler |


综上,选择xxl-job。
6、XXL-JOB
详细的使用说明在官网有非常详尽的文档说明,这里只说明本地应用部分和针对源码进行分析。
框架图

6.1调度中心源码解读
因为时候springboot工程,所以直接从配置文件入手
/*** @author xuxueli 2018-10-28 00:18:17*/@Component@DependsOn("xxlJobAdminConfig")public class XxlJobScheduler implements InitializingBean, DisposableBean {private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);@Overridepublic void afterPropertiesSet() throws Exception {// init i18n 国际化支持,通过配置文件中配置item的中英文实现initI18n();// admin registry monitor run// 启动自动注册线程, 获取类型为自动注册的执行器信息,完成机器的自动注册与发现JobRegistryMonitorHelper.getInstance().start();// admin monitor run// 启动失败日志监控线程JobFailMonitorHelper.getInstance().start();// admin-serverinitRpcProvider();// start-schedule// 执行定时任务JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}@Overridepublic void destroy() throws Exception {// stop-scheduleJobScheduleHelper.getInstance().toStop();// admin trigger pool stopJobTriggerPoolHelper.toStop();// admin registry stopJobRegistryMonitorHelper.getInstance().toStop();// admin monitor stopJobFailMonitorHelper.getInstance().toStop();// admin-serverstopRpcProvider();}// ---------------------- I18n ----------------------private void initI18n(){for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));}}// ---------------------- admin rpc provider (no server version) ----------------------//启动注册中心的RPC服务,使得执行器项目可以通过RPC进行注册和心跳检测private static ServletServerHandler servletServerHandler;private void initRpcProvider(){// initXxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP,Serializer.SerializeEnum.HESSIAN.getSerializer(),null,0,XxlJobAdminConfig.getAdminConfig().getAccessToken(),null,null);// add servicesxxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());// servlet handlerservletServerHandler = new ServletServerHandler(xxlRpcProviderFactory);}private void stopRpcProvider() throws Exception {XxlRpcInvokerFactory.getInstance().stop();}public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {servletServerHandler.handle(null, request, response);}// ---------------------- executor-client ----------------------private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();public static ExecutorBiz getExecutorBiz(String address) throws Exception {// validif (address==null || address.trim().length()==0) {return null;}// load-cacheaddress = address.trim();ExecutorBiz executorBiz = executorBizRepository.get(address);if (executorBiz != null) {return executorBiz;}// set-cacheexecutorBiz = (ExecutorBiz) new XxlRpcReferenceBean(NetEnum.NETTY_HTTP,Serializer.SerializeEnum.HESSIAN.getSerializer(),CallType.SYNC,LoadBalance.ROUND,ExecutorBiz.class,null,3000,address,XxlJobAdminConfig.getAdminConfig().getAccessToken(),null,null).getObject();executorBizRepository.put(address, executorBiz);return executorBiz;}}
重点一:启动自动注册线程, 获取类型为自动注册的执行器信息,完成机器的自动注册与发现
JobRegistryMonitorHelper.getInstance().start();
public class JobRegistryMonitorHelper {private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class);private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper();public static JobRegistryMonitorHelper getInstance(){return instance;}private Thread registryThread;private volatile boolean toStop = false;public void start(){// 创建一个线程registryThread = new Thread(new Runnable() {@Overridepublic void run() {// 当toStop 为false时进入该循环。while (!toStop) {try {// auto registry group// 获取类型为自动注册的执行器地址列表List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList!=null && !groupList.isEmpty()) {// remove dead address (admin/executor)// 删除 90秒之内没有更新信息的注册机器, 90秒没有心跳信息返回,代表机器已经出现问题,故移除List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT);if (ids!=null && ids.size()>0) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)// 查询在90秒之内有过更新的机器列表HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT);if (list != null) {// 循环注册机器列表,根据执行器不同,将这些机器列表区分拿出来for (XxlJobRegistry item: list) {// 判断该机器注册信息RegistryGroup是否是EXECUTOR , EXECUTOR代表该机器是注册到执行器上面的// RegistType分为ADMIN 和EXECUTORif (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {// 获取注册的执行器 KEY (也就是执行器)String appName = item.getRegistryKey();List<String> registryList = appAddressMap.get(appName);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}// 收集机器信息,根据执行器做区分appAddressMap.put(appName, registryList);}}}// fresh group address// 遍历执行器列表,根据上面的心跳检测有限的地址更新回数据库for (XxlJobGroup group: groupList) {// 通过执行器的APP_NAME拿出他下面的集群机器地址List<String> registryList = appAddressMap.get(group.getAppName());String addressListStr = null;if (registryList!=null && !registryList.isEmpty()) {Collections.sort(registryList);addressListStr = "";for (String item:registryList) {addressListStr += item + ",";}// 转为为String,通过逗号分隔addressListStr = addressListStr.substring(0, addressListStr.length()-1);}group.setAddressList(addressListStr);// 将这个执行器的集群机器地址列表,写入到数据库XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}});registryThread.setDaemon(true);registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");registryThread.start();}public void toStop(){toStop = true;// interrupt and waitregistryThread.interrupt();try {registryThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}
start方法主要作用是开通了一个守护线程,每隔30s扫描一次执行器的注册信息表
•剔除90s内没有进行健康检查的执行器信息•将自动注册类型的执行器注册信息(XxlJobRegistry)经过处理更新执行器信息(XxlJobGroup)
重点二:启动失败日志监控线程,监控任务的执行状态, 如若失败,则发送邮件预警
JobFailMonitorHelper.getInstance().start();
public class JobFailMonitorHelper {private static Logger logger = LoggerFactory.getLogger(JobFailMonitorHelper.class);private static JobFailMonitorHelper instance = new JobFailMonitorHelper();public static JobFailMonitorHelper getInstance(){return instance;}// ---------------------- monitor ----------------------private Thread monitorThread;private volatile boolean toStop = false;public void start(){monitorThread = new Thread(new Runnable() {@Overridepublic void run() {// monitorwhile (!toStop) {try {// 查询失败的failLogIdsList<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);if (failLogIds!=null && !failLogIds.isEmpty()) {// 遍历失败的日志for (long failLogId: failLogIds) {// lock log// 更新提醒状态,如果已经提醒则不再提醒int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);if (lockRet < 1) {continue;}XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());// 1、fail retry monitor// 根据执行反馈判断是否继续触发任务,可能任务触发但是未收到反馈则继续触发if (log.getExecutorFailRetryCount() > 0) {JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), null);String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";log.setTriggerMsg(log.getTriggerMsg() + retryMsg);XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);}// 2、fail alarm monitor// 任务执行失败, 执行发送邮件等预警措施int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {boolean alarmResult = true;try {alarmResult = failAlarm(info, log);} catch (Exception e) {alarmResult = false;logger.error(e.getMessage(), e);}newAlarmStatus = alarmResult?2:3;} else {newAlarmStatus = 1;}XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);}}// 停顿一下TimeUnit.SECONDS.sleep(10);} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");}});monitorThread.setDaemon(true);monitorThread.setName("xxl-job, admin JobFailMonitorHelper");monitorThread.start();}public void toStop(){toStop = true;// interrupt and waitmonitorThread.interrupt();try {monitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// ---------------------- alarm ----------------------// email alarm templateprivate static final String mailBodyTemplate = "<h5>" + I18nUtil.getString("jobconf_monitor_detail") + ":</span>" +"<table border=\"1\" cellpadding=\"3\" style=\"border-collapse:collapse; width:80%;\" >\n" +" <thead style=\"font-weight: bold;color: #ffffff;background-color: #ff8c00;\" >" +" <tr>\n" +" <td width=\"20%\" >"+ I18nUtil.getString("jobinfo_field_jobgroup") +"</td>\n" +" <td width=\"10%\" >"+ I18nUtil.getString("jobinfo_field_id") +"</td>\n" +" <td width=\"20%\" >"+ I18nUtil.getString("jobinfo_field_jobdesc") +"</td>\n" +" <td width=\"10%\" >"+ I18nUtil.getString("jobconf_monitor_alarm_title") +"</td>\n" +" <td width=\"40%\" >"+ I18nUtil.getString("jobconf_monitor_alarm_content") +"</td>\n" +" </tr>\n" +" </thead>\n" +" <tbody>\n" +" <tr>\n" +" <td>{0}</td>\n" +" <td>{1}</td>\n" +" <td>{2}</td>\n" +" <td>"+ I18nUtil.getString("jobconf_monitor_alarm_type") +"</td>\n" +" <td>{3}</td>\n" +" </tr>\n" +" </tbody>\n" +"</table>";/*** fail alarm** @param jobLog*/private boolean failAlarm(XxlJobInfo info, XxlJobLog jobLog){boolean alarmResult = true;// send monitor emailif (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {// alarmContentString alarmContent = "Alarm Job LogId=" + jobLog.getId();if (jobLog.getTriggerCode() != ReturnT.SUCCESS_CODE) {alarmContent += "<br>TriggerMsg=<br>" + jobLog.getTriggerMsg();}if (jobLog.getHandleCode()>0 && jobLog.getHandleCode() != ReturnT.SUCCESS_CODE) {alarmContent += "<br>HandleCode=" + jobLog.getHandleMsg();}// email infoXxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(Integer.valueOf(info.getJobGroup()));String personal = I18nUtil.getString("admin_name_full");String title = I18nUtil.getString("jobconf_monitor");String content = MessageFormat.format(mailBodyTemplate,group!=null?group.getTitle():"null",info.getId(),info.getJobDesc(),alarmContent);Set<String> emailSet = new HashSet<String>(Arrays.asList(info.getAlarmEmail().split(",")));for (String email: emailSet) {// make mailtry {MimeMessage mimeMessage = XxlJobAdminConfig.getAdminConfig().getMailSender().createMimeMessage();MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true);helper.setFrom(XxlJobAdminConfig.getAdminConfig().getEmailUserName(), personal);helper.setTo(email);helper.setSubject(title);helper.setText(content, true);XxlJobAdminConfig.getAdminConfig().getMailSender().send(mimeMessage);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job, job fail alarm email send error, JobLogId:{}", jobLog.getId(), e);alarmResult = false;}}}// do something, custom alarm strategy, such as smsreturn alarmResult;}}
也就是说这里主要作用是开通了一个守护线程,每隔10s扫描一次失败日志
•如果任务失败可重试次数>0,那么重新触发任务•如果任务执行失败,会进行告警,默认采用邮件形式进行告警
重点三:xxl-job 是基于quartz 进行的二次开发,在系统启动的时候,quartz框架会自动去数据库读取相关的配置信息,载入相关定时器信息
JobScheduleHelper.getInstance().start();
6.2客户端源码解读
实际在执行器应用中,内嵌了一个jetty服务器, 服务在xxlJobExecutor 初始化的时候启动。当执行器端启动时会定时向注册中心进行自动注册,并且当调度中心有任务触发的时候也会发起RPC请求,请求执行器执行具体的任务
public class XxlJobExecutor {private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);// ---------------------- param ----------------------private String adminAddresses;private String appName;private String ip;private int port;private String accessToken;private String logPath;private int logRetentionDays;public void setAdminAddresses(String adminAddresses) {this.adminAddresses = adminAddresses;}public void setAppName(String appName) {this.appName = appName;}public void setIp(String ip) {this.ip = ip;}public void setPort(int port) {this.port = port;}public void setAccessToken(String accessToken) {this.accessToken = accessToken;}public void setLogPath(String logPath) {this.logPath = logPath;}public void setLogRetentionDays(int logRetentionDays) {this.logRetentionDays = logRetentionDays;}// ---------------------- start + stop ----------------------public void start() throws Exception {// init logpath// 初始化本地日志路径XxlJobFileAppender.initLogPath(logPath);// init invoker, admin-client// 初始化调度中心的地址列表, 通过XxlRpcReferenceBean创建好adminBiz实例initAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThread// 启动一个线程,用来清理本地日志, 默认保留最近一天的日志JobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThread// 初始化callback进程TriggerCallbackThread.getInstance().start();// init executor-server// 任务执行providerport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();initRpcProvider(ip, port, appName, accessToken);}public void destroy(){// destory jobThreadRepositoryif (jobThreadRepository.size() > 0) {for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {removeJobThread(item.getKey(), "web container destroy and kill the job.");}jobThreadRepository.clear();}jobHandlerRepository.clear();// destory JobLogFileCleanThreadJobLogFileCleanThread.getInstance().toStop();// destory TriggerCallbackThreadTriggerCallbackThread.getInstance().toStop();// destory executor-serverstopRpcProvider();// destory invokerstopInvokerFactory();}// ---------------------- admin-client (rpc invoker) ----------------------private static List<AdminBiz> adminBizList;private static Serializer serializer;private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();if (adminAddresses!=null && adminAddresses.trim().length()>0) {for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {String addressUrl = address.concat(AdminBiz.MAPPING);AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.NETTY_HTTP,serializer,CallType.SYNC,LoadBalance.ROUND,AdminBiz.class,null,3000,addressUrl,accessToken,null,null).getObject();if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}adminBizList.add(adminBiz);}}}}private void stopInvokerFactory(){// stop invoker factorytry {XxlRpcInvokerFactory.getInstance().stop();} catch (Exception e) {logger.error(e.getMessage(), e);}}public static List<AdminBiz> getAdminBizList(){return adminBizList;}public static Serializer getSerializer() {return serializer;}// ---------------------- executor-server (rpc provider) ----------------------private XxlRpcProviderFactory xxlRpcProviderFactory = null;private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {// init, provider factory// 初始化地址,如果端口为空,默认9999为jetty的服务器端口String address = IpUtil.getIpPort(ip, port);Map<String, String> serviceRegistryParam = new HashMap<String, String>();serviceRegistryParam.put("appName", appName);serviceRegistryParam.put("address", address);xxlRpcProviderFactory = new XxlRpcProviderFactory();xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);// add services// 创建一个ExecutorService 实例,放入Map中,后面会通过class获取到他的实例执行run方法xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());// start// jetty + registryxxlRpcProviderFactory.start();}public static class ExecutorServiceRegistry extends ServiceRegistry {@Overridepublic void start(Map<String, String> param) {// start registryExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));}@Overridepublic void stop() {// stop registryExecutorRegistryThread.getInstance().toStop();}@Overridepublic boolean registry(Set<String> keys, String value) {return false;}@Overridepublic boolean remove(Set<String> keys, String value) {return false;}@Overridepublic Map<String, TreeSet<String>> discovery(Set<String> keys) {return null;}@Overridepublic TreeSet<String> discovery(String key) {return null;}}private void stopRpcProvider() {// stop provider factorytry {xxlRpcProviderFactory.stop();} catch (Exception e) {logger.error(e.getMessage(), e);}}// ---------------------- job handler repository ----------------------private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}// ---------------------- job thread repository ----------------------private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);newJobThread.start();logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}public static void removeJobThread(int jobId, String removeOldReason){JobThread oldJobThread = jobThreadRepository.remove(jobId);if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}}public static JobThread loadJobThread(int jobId){JobThread jobThread = jobThreadRepository.get(jobId);return jobThread;}}
6.3客户端使用
6.3.1引入依赖
<!-- xxl-job-core --><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.1.0</version></dependency>
6.3.2配置
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册xxl.job.executor.appname=xxl-job-executor-sample### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";xxl.job.executor.ip=### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;xxl.job.executor.port=9999### 执行器通讯TOKEN [选填]:非空时启用;xxl.job.accessToken=### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler### 执行器日志保存天数 [选填] :值大于3时生效,启用执行器Log文件定期清理功能,否则不生效;xxl.job.executor.logretentiondays=-1
6.3.3配置类
/*** xxl-job config** @author xuxueli 2017-04-28*/@Configurationpublic class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.executor.appname}")private String appName;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Bean(initMethod = "start", destroyMethod = "destroy")public XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppName(appName);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}/*** 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;** 1、引入依赖:* <dependency>* <groupId>org.springframework.cloud</groupId>* <artifactId>spring-cloud-commons</artifactId>* <version>${version}</version>* </dependency>** 2、配置文件,或者容器启动变量* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'** 3、获取IP* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();*/}
6.3.4执行器类
/*** 任务Handler示例(Bean模式)** 开发步骤:* 1、继承"IJobHandler":“com.xxl.job.core.handler.IJobHandler”;* 2、注册到Spring容器:添加“@Component”注解,被Spring容器扫描为Bean实例;* 3、注册到执行器工厂:添加“@JobHandler(value="自定义jobhandler名称")”注解,注解value值对应的是调度中心新建任务的JobHandler属性的值。* 4、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;** @author xuxueli 2015-12-19 19:43:36*/@JobHandler(value="demoJobHandler")@Componentpublic class DemoJobHandler extends IJobHandler {@Overridepublic ReturnT<String> execute(String param) throws Exception {XxlJobLogger.log("XXL-JOB, Hello World.");for (int i = 0; i < 5; i++) {XxlJobLogger.log("beat at:" + i);TimeUnit.SECONDS.sleep(2);}return SUCCESS;}}




