暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

分布式任务调度

AweSomeBaZinGa 2019-09-17
692


分布式任务调度

1、什么是分布式任务调度?

任务调度是指基于给定的时间点,给定的时间间隔或者给定执行次数自动的执行任务。任务调度是是操作系统的重要组成部分,而对于实时的操作系统,任务调度直接影响着操作系统的实时性能。任务调度涉及到多线程并发、运行时间规则定制及解析、线程池的维护等诸多方面的工作。

三个关键词:分布式、任务调度、配置中心

分布式:平台是分布式部署的,各个节点之间可以无状态和无限的水平扩展,分布式任务调度就是要解决在多个节点上只执行一次还是都执行的问题。 任务调度:涉及到任务状态管理、任务调度请求的发送与接收、具体任务的分配、任务的具体执行;(这里又会遇到一共要处理哪些任务、任务要分配到哪些机器上处理、任务分发的时候判断哪些机器可以用等问题,所以又需要一个可以感知整个集群运行状态的配置中心) 配置中心:可以感知整个集群的状态、任务信息的注册

一个分布式任务调度系统需要以下内容:

web模块、server模块、scheduler模块、worker模块、注册中心。

1.Web模块:用来提供任务的信息,控制任务的状态、信息展示等。2.Server模块:负责接收web端传来的任务执行的信息,下发任务调度请求给Scheduler,会去注册中心进行注册3.Scheduler模块:接收server端传来的调度请求,将任务进行更加细化的拆分然后下发,到注册中心进行注册,获取到可以干活的worker。4.Worker模块:负责具体的任务执行。5.注册中心。

2、任务调度和消息的差异

2.1可互换场景

通知类:电商发货成功发短信通知客户的业务场景,我们可以在发货成功后发送MQ消息到队列,然后去消费mq消息,发送短信。

2.2非互换场景

时间驱动/事件驱动:内部系统一般可以通过时间来驱动,但涉及到外部系统,则只能使用时间驱动。如怕取外部网站价格,每小时爬一次。批量处理/逐条处理:批量处理堆积的数据更加高效,在不需要实时性的情况下比消息中间件更有优势。而且有的业务逻辑只能批量处理。如移动每个月结算我们的话费。实时性/非实时性:消息中间件能够做到实时处理数据,但是有些情况下并不需要实时,比如:vip升级。系统内部/系统解耦:定时任务调度一般是在系统内部,而消息中间件可用于两个系统间。

3、单机实现

Timer:是一个定时器类,通过该类可以为指定的定时任务进行配置。TimerTask类是一个定时任务类,该类实现了Runnable接口,缺点异常未检查会中止线程Thread:配置一个永远执行的进程实现定时任务ScheduledExecutorService:相对延迟或者周期作为定时任务调度,缺点没有绝对的日期或者时间Spring定时框架(@Schedule):配置简单功能较多,如果系统使用单机的话可以优先考虑Spring定时器

4、分步是任务调度考虑的问题

简单-对开发者接入简单,对使用者使用简单。丰富的文档,有很多开源的项目文档少之又少,当然还有一些开源项目只有英文文档,如果你英文不是很行,那可能需要考虑中文居多的文档。有管理界面,很方便执行操作和统计数据。支持主流框架:比如Spring,Springboot等,当然这个至少要支持你们业务中的主流框架。框架轻量级,方便根据自己的需求进行定制化。高性能,高可靠,高可用:不能让框架成为业务中的瓶颈。代码更新频率和社区使用情况:使用的公司越多证明其越受更多人的喜爱,代码更新频率越高证明出现问题就会越少,最好是由大厂开源并且维护。多语言需求:如果在你们业务中有多语言需求,比如你们公司用的开发语言很多,都需要调度框架那么你需要使用多语言支持。比如Rpc支持多语言的代表就是Thrift。能否解决当前的痛点:这个是最重要的,如果连你问题都解决不了那使用这个还有什么意义呢?

5、开源分布式任务调度框架对比

对比内容cronsunxxl-jobElastic-jobsaturnltsTBSchedule
项目背景替代crontab大众点评、文档齐全当当网开源、文档齐全唯品会自主研发,基于当当的Elastic-job版本1--阿里早期开源、代码略陈旧
语言GoJavaJavaJavaJavaJava
依赖etcd、mongodbmysql ,jdk1.7+ , maven3.0+jdk1.7+, zookeeper 3.4.6+ ,maven3.0.4+ ,mesos,mysqlJDK 7 or JDK 8 Maven 3.0.4+ node.js 8.7.0+ npm 5.4.2+ docker (版本不限)CentOs maven java MySQL
多平台接入
大众点评、优信二手车等70个以上的公司接入



易用、易部署安装简单、使用简便支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手易用易部署可以很好的部署到docker容器上易用,难部署
web界面统一管理任务cronweb界面可以管理任务xxl-job-admin提供了web界面,同时也是作为任务调度中心提供运维界面elastic-job-lite-console,可以管理作业和注册中心Saturn Consolelts-admin
多节点部署时任务不能重复执行支持任务单次执行“调度中心”通过DB锁保证集群分布式调度的一致性, 一次任务调度只会触发一次执行容错处理:作业服务器与Zookeeper服务器通信失败则立即停止作业运行,防止作业注册中心将失效的分片分项配给其他作业服务器,而当前作业服务器仍在执行任务,导致重复执行 幂等性:重复作业任务项判定,不重复执行已运行的作业任务项
容错处理支持
日志可追溯有日志查询界面,并已支持日志清除支持在线查看调度结果,并且支持以Rolling方式实时查看执行器输出的完整的执行日志(数据库)支持,有日志查询界面,job的运行日志将会记录到数据库里saturn-executor-log.log(executor日志)saturn-executor.log(job日志)在TaskTracker端提供了业务日志记录器
弹性扩容缩容
一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务运行中的作业服务器崩溃,或新增加n台作业服务器,作业框架将在下次作业执行前重新分片,不影响当前作业执行能支持容器化技术进行自动executor扩容和减容,保证高峰期处理能力的弹性伸缩。因为LTS各个节点都是无状态的,所以支持动态增加删除节点,达到负载均衡的目的(2) TBSchedule的宿主服务器可以进行动态的扩容和资源回收
报警邮件提醒,由cronweb界面发出,而不是cronnode任务失败邮件报警,支持配置多邮箱地址,用逗号隔开可通过事件订阅自行实现Saturn提供了完善的监控告警机制,能方便开发者进行监控

阻塞处理策略多机单任务(防止单机挂掉任务不按时执行)策略包括:单机串行、丢弃后续调度、覆盖之前的调度zk的session timeout时间,超过这个时间临时节点将会被清除,作业才会重新分片设置了timeout的作业开始执行后会启动超时检测,如果执行超时,则会停止当前作业的执行

支持并行调度多节点调度系统多线程触发调度运行,确保调度精确执行,不被阻塞将一个任务分为多个小任务在多台服务器上执行分片多节点支持
失败处理策略任务失败重试失败告警、失败重试失败转移、被错过的作业重触发支持作业HA,负载均衡和失败转移当节点组中的一个节点当机之后,自动转到其他节点工作。当整个节点组当机之后,将会采用存储文件的方式,待节点组可用的时候进行重发。当执行任务的TaskTracker节点当机之后,JobTracker 会将这个TaskTracker上的未完成的任务(死任务),重新分配给节点组中其他节点执行。失效转移
高可用当某个节点死机的时候可以自动调度到正常的节点执行如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求调度器的高可用是通过运行几个指向同一个ZooKeeper集群的Elastic-Job-Cloud-Scheduler实例来实现的。ZooKeeper用于在当前主Elastic-Job-Cloud-Scheduler实例失败的情况下执行领导者选举。通过至少两个调度器实例来构成集群,集群中只有一个调度器实例提供服务,其他实例处于”待命”状态。当该实例失败时,集群会选举剩余实例中的一个来继续提供服务。Saturn是面向任务的,能够监控到executor的状态,在executor下线或者上线的时,均会对任务分片进行重分配,保证其可用性。由于上面的失败处理策略,实现了任务的高可用支持
路由策略故障转移第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等失效转移失败转移、负载均衡故障转移
动态分片
分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。默认包含三种分片策略:基于平均分配算法的分片策略、 作业名的哈希值奇偶数决定IP升降序算法的分片策略、根据作业名的哈希值对Job实例列表进行轮转的分片策略,支持自定义分片策略由用户在UI界面输入
TBSchedule的分布式机制是通过灵活的Sharding方式实现的,比如可以按所有数据的ID按10取模分片、按月份分片等,根据不同的场景由客户端配置分片规则。
任务类型普通任务、单机单进程任务、一个任务执行间隔内允许执行一次提供Web IDE,支持在线开发任务逻辑代码,动态发布,实时编译生效,省略部署上线的过程。支持30个版本的历史版本回溯。支持以GLUE模式开发和运行脚本任务,包括Shell、Python、NodeJS等类型脚本支持OneOff,Perpetual和SequencePerpetual三种作业模式提供除java作业的基础上增加了shell作业的支持。主要可以支持类似PHP, Python等脚本作业实时任务:提交了之后立即就要执行的任务。定时任务:在指定时间点执行的任务,譬如 今天3点执行(单次)。Cron任务:CronExpression,和quartz类似(但是不是使用quartz实现的)譬如 0 0/1 * ? Repeat任务:譬如每隔5分钟执行一次,重复50次就停止。
任务在业务中的状态流转所有的任务都会存储在一个分布式etcd里,单个crond部署成一个服务,也就是图中所示的node.1、node.2、node.n等,然后再由web界面去管理将调度的行为抽象成“调度中心”公共平台,“调度中心”负责发起调度请求,将任务抽象成JobHandler,交由执行器管理,执行器负责接收调度请求并执行相应的JobHandler中的业务逻辑首先配置任务的相关信息,然后任务去注册中心进行注册,注册中心完成主节点选举和分片策略的设置。Quarzt job一旦触发,则任务执行。任务执行后,处理错过的任务以及监听事件将作业在逻辑上划分为若干个分片,通过作业分片调度器将作业分片指派给特定的执行节点。执行节点通过quartz触发执行作业的具体实现,在执行的时候,会将分片序号和参数作为参数传入。作业的实现逻辑需分析分片序号和分片参数,并以此为依据来调用具体的实现任务到注册中心进行注册,JobClient将任务进行提交,JobTracker接收并分配任务,TaskTracker执行任务,并将结果反馈给客户端
任务进度监控在cronweb添加完成任务之后,在任务标签页就能看到所有添加的定时任务以及执行情况支持实时监控任务进度监控作业运行时状态,统计最近一段时间处理的数据成功和失败数量,记录作业上次运行开始时间,结束时间和下次运行时间。Saturn通过统一的控制台进行全部域及全部作业的配置,执行情况监控,结点监控等这些业务日志可以通过任务ID串联起来,可以在LTS-Admin中实时查看任务的执行进度schedule管理控制台。负责控制、监控任务执行状态。
自定义任务参数在cronweb添加定时任务时,可以指定任务执行脚本和参数支持在线配置调度任务入参,即时生效在修改任务时,可以自定义参数可以在界面中配置任务的参数

安全设置支持security.json安全设置调度中心向执行器发送的调度请求时使用RequestModel和ResponseModel两个对象封装调度请求参数和响应数据, 在进行通讯之前底层会将上述两个对象对象序列化,并进行数据协议以及时间戳检验,从而达到数据加密的功能



开源地址http://github.com/shunfei/cronsunhttps://github.com/xuxueli/xxl-jobhttps://github.com/elasticjob/elastic-job-litehttps://github.com/vipshop/Saturnhttps://github.com/ltsopensource/light-task-scheduler

综上,选择xxl-job。

6、XXL-JOB

详细的使用说明在官网有非常详尽的文档说明,这里只说明本地应用部分和针对源码进行分析。

框架图

6.1调度中心源码解读

因为时候springboot工程,所以直接从配置文件入手

/**
* @author xuxueli 2018-10-28 00:18:17
*/
@Component
@DependsOn("xxlJobAdminConfig")
public class XxlJobScheduler implements InitializingBean, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);




@Override
public void afterPropertiesSet() throws Exception {
// init i18n 国际化支持,通过配置文件中配置item的中英文实现
initI18n();


// admin registry monitor run
// 启动自动注册线程, 获取类型为自动注册的执行器信息,完成机器的自动注册与发现
JobRegistryMonitorHelper.getInstance().start();


// admin monitor run
// 启动失败日志监控线程
JobFailMonitorHelper.getInstance().start();


// admin-server
initRpcProvider();


// start-schedule
// 执行定时任务
JobScheduleHelper.getInstance().start();


logger.info(">>>>>>>>> init xxl-job admin success.");
}


@Override
public void destroy() throws Exception {


// stop-schedule
JobScheduleHelper.getInstance().toStop();


// admin trigger pool stop
JobTriggerPoolHelper.toStop();


// admin registry stop
JobRegistryMonitorHelper.getInstance().toStop();


// admin monitor stop
JobFailMonitorHelper.getInstance().toStop();


// admin-server
stopRpcProvider();
}


// ---------------------- I18n ----------------------


private void initI18n(){
for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
}
}


// ---------------------- admin rpc provider (no server version) ----------------------
//启动注册中心的RPC服务,使得执行器项目可以通过RPC进行注册和心跳检测
private static ServletServerHandler servletServerHandler;
private void initRpcProvider(){
// init
XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
null,
0,
XxlJobAdminConfig.getAdminConfig().getAccessToken(),
null,
null);


// add services
xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());


// servlet handler
servletServerHandler = new ServletServerHandler(xxlRpcProviderFactory);
}
private void stopRpcProvider() throws Exception {
XxlRpcInvokerFactory.getInstance().stop();
}
public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
servletServerHandler.handle(null, request, response);
}




// ---------------------- executor-client ----------------------
private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}


// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}


// set-cache
executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
ExecutorBiz.class,
null,
3000,
address,
XxlJobAdminConfig.getAdminConfig().getAccessToken(),
null,
null).getObject();


executorBizRepository.put(address, executorBiz);
return executorBiz;
}


}


重点一:启动自动注册线程, 获取类型为自动注册的执行器信息,完成机器的自动注册与发现

JobRegistryMonitorHelper.getInstance().start();
public class JobRegistryMonitorHelper {
private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class);


private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper();
public static JobRegistryMonitorHelper getInstance(){
return instance;
}


private Thread registryThread;
private volatile boolean toStop = false;
public void start(){
// 创建一个线程
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// 当toStop 为false时进入该循环。
while (!toStop) {
try {
// auto registry group
// 获取类型为自动注册的执行器地址列表
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {


// remove dead address (admin/executor)
// 删除 90秒之内没有更新信息的注册机器, 90秒没有心跳信息返回,代表机器已经出现问题,故移除
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT);
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}


// fresh online address (admin/executor)
// 查询在90秒之内有过更新的机器列表
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT);
if (list != null) {
// 循环注册机器列表,根据执行器不同,将这些机器列表区分拿出来
for (XxlJobRegistry item: list) {
// 判断该机器注册信息RegistryGroup是否是EXECUTOR , EXECUTOR代表该机器是注册到执行器上面的
// RegistType分为ADMIN 和EXECUTOR
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
// 获取注册的执行器 KEY (也就是执行器)
String appName = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appName);
if (registryList == null) {
registryList = new ArrayList<String>();
}


if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
// 收集机器信息,根据执行器做区分
appAddressMap.put(appName, registryList);
}
}
}


// fresh group address
// 遍历执行器列表,根据上面的心跳检测有限的地址更新回数据库
for (XxlJobGroup group: groupList) {
// 通过执行器的APP_NAME拿出他下面的集群机器地址
List<String> registryList = appAddressMap.get(group.getAppName());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
addressListStr = "";
for (String item:registryList) {
addressListStr += item + ",";
}
// 转为为String,通过逗号分隔
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
// 将这个执行器的集群机器地址列表,写入到数据库
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
registryThread.start();
}


public void toStop(){
toStop = true;
// interrupt and wait
registryThread.interrupt();
try {
registryThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}


}


start方法主要作用是开通了一个守护线程,每隔30s扫描一次执行器的注册信息表

剔除90s内没有进行健康检查的执行器信息将自动注册类型的执行器注册信息(XxlJobRegistry)经过处理更新执行器信息(XxlJobGroup)

重点二:启动失败日志监控线程,监控任务的执行状态, 如若失败,则发送邮件预警

JobFailMonitorHelper.getInstance().start();
public class JobFailMonitorHelper {
private static Logger logger = LoggerFactory.getLogger(JobFailMonitorHelper.class);


private static JobFailMonitorHelper instance = new JobFailMonitorHelper();
public static JobFailMonitorHelper getInstance(){
return instance;
}


// ---------------------- monitor ----------------------


private Thread monitorThread;
private volatile boolean toStop = false;
public void start(){
monitorThread = new Thread(new Runnable() {


@Override
public void run() {


// monitor
while (!toStop) {
try {
// 查询失败的failLogIds
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
// 遍历失败的日志
for (long failLogId: failLogIds) {


// lock log
// 更新提醒状态,如果已经提醒则不再提醒
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());


// 1、fail retry monitor
// 根据执行反馈判断是否继续触发任务,可能任务触发但是未收到反馈则继续触发
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}


// 2、fail alarm monitor
// 任务执行失败, 执行发送邮件等预警措施
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
boolean alarmResult = true;
try {
alarmResult = failAlarm(info, log);
} catch (Exception e) {
alarmResult = false;
logger.error(e.getMessage(), e);
}
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}


XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
// 停顿一下
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
}


logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");


}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}


public void toStop(){
toStop = true;
// interrupt and wait
monitorThread.interrupt();
try {
monitorThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}




// ---------------------- alarm ----------------------


// email alarm template
private static final String mailBodyTemplate = "<h5>" + I18nUtil.getString("jobconf_monitor_detail") + ":</span>" +
"<table border=\"1\" cellpadding=\"3\" style=\"border-collapse:collapse; width:80%;\" >\n" +
" <thead style=\"font-weight: bold;color: #ffffff;background-color: #ff8c00;\" >" +
" <tr>\n" +
" <td width=\"20%\" >"+ I18nUtil.getString("jobinfo_field_jobgroup") +"</td>\n" +
" <td width=\"10%\" >"+ I18nUtil.getString("jobinfo_field_id") +"</td>\n" +
" <td width=\"20%\" >"+ I18nUtil.getString("jobinfo_field_jobdesc") +"</td>\n" +
" <td width=\"10%\" >"+ I18nUtil.getString("jobconf_monitor_alarm_title") +"</td>\n" +
" <td width=\"40%\" >"+ I18nUtil.getString("jobconf_monitor_alarm_content") +"</td>\n" +
" </tr>\n" +
" </thead>\n" +
" <tbody>\n" +
" <tr>\n" +
" <td>{0}</td>\n" +
" <td>{1}</td>\n" +
" <td>{2}</td>\n" +
" <td>"+ I18nUtil.getString("jobconf_monitor_alarm_type") +"</td>\n" +
" <td>{3}</td>\n" +
" </tr>\n" +
" </tbody>\n" +
"</table>";


/**
* fail alarm
*
* @param jobLog
*/
private boolean failAlarm(XxlJobInfo info, XxlJobLog jobLog){
boolean alarmResult = true;


// send monitor email
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {


// alarmContent
String alarmContent = "Alarm Job LogId=" + jobLog.getId();
if (jobLog.getTriggerCode() != ReturnT.SUCCESS_CODE) {
alarmContent += "<br>TriggerMsg=<br>" + jobLog.getTriggerMsg();
}
if (jobLog.getHandleCode()>0 && jobLog.getHandleCode() != ReturnT.SUCCESS_CODE) {
alarmContent += "<br>HandleCode=" + jobLog.getHandleMsg();
}


// email info
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(Integer.valueOf(info.getJobGroup()));
String personal = I18nUtil.getString("admin_name_full");
String title = I18nUtil.getString("jobconf_monitor");
String content = MessageFormat.format(mailBodyTemplate,
group!=null?group.getTitle():"null",
info.getId(),
info.getJobDesc(),
alarmContent);


Set<String> emailSet = new HashSet<String>(Arrays.asList(info.getAlarmEmail().split(",")));
for (String email: emailSet) {


// make mail
try {
MimeMessage mimeMessage = XxlJobAdminConfig.getAdminConfig().getMailSender().createMimeMessage();


MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true);
helper.setFrom(XxlJobAdminConfig.getAdminConfig().getEmailUserName(), personal);
helper.setTo(email);
helper.setSubject(title);
helper.setText(content, true);


XxlJobAdminConfig.getAdminConfig().getMailSender().send(mimeMessage);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job, job fail alarm email send error, JobLogId:{}", jobLog.getId(), e);


alarmResult = false;
}


}
}


// do something, custom alarm strategy, such as sms




return alarmResult;
}


}


也就是说这里主要作用是开通了一个守护线程,每隔10s扫描一次失败日志

如果任务失败可重试次数>0,那么重新触发任务如果任务执行失败,会进行告警,默认采用邮件形式进行告警

重点三:xxl-job 是基于quartz 进行的二次开发,在系统启动的时候,quartz框架会自动去数据库读取相关的配置信息,载入相关定时器信息

JobScheduleHelper.getInstance().start();

6.2客户端源码解读

实际在执行器应用中,内嵌了一个jetty服务器, 服务在xxlJobExecutor 初始化的时候启动。当执行器端启动时会定时向注册中心进行自动注册,并且当调度中心有任务触发的时候也会发起RPC请求,请求执行器执行具体的任务

public class XxlJobExecutor  {
private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);


// ---------------------- param ----------------------
private String adminAddresses;
private String appName;
private String ip;
private int port;
private String accessToken;
private String logPath;
private int logRetentionDays;


public void setAdminAddresses(String adminAddresses) {
this.adminAddresses = adminAddresses;
}
public void setAppName(String appName) {
this.appName = appName;
}
public void setIp(String ip) {
this.ip = ip;
}
public void setPort(int port) {
this.port = port;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public void setLogRetentionDays(int logRetentionDays) {
this.logRetentionDays = logRetentionDays;
}




// ---------------------- start + stop ----------------------
public void start() throws Exception {


// init logpath
// 初始化本地日志路径
XxlJobFileAppender.initLogPath(logPath);


// init invoker, admin-client
// 初始化调度中心的地址列表, 通过XxlRpcReferenceBean创建好adminBiz实例
initAdminBizList(adminAddresses, accessToken);




// init JobLogFileCleanThread
// 启动一个线程,用来清理本地日志, 默认保留最近一天的日志
JobLogFileCleanThread.getInstance().start(logRetentionDays);


// init TriggerCallbackThread
// 初始化callback进程
TriggerCallbackThread.getInstance().start();


// init executor-server
// 任务执行provider
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
initRpcProvider(ip, port, appName, accessToken);
}
public void destroy(){
// destory jobThreadRepository
if (jobThreadRepository.size() > 0) {
for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
removeJobThread(item.getKey(), "web container destroy and kill the job.");
}
jobThreadRepository.clear();
}
jobHandlerRepository.clear();




// destory JobLogFileCleanThread
JobLogFileCleanThread.getInstance().toStop();


// destory TriggerCallbackThread
TriggerCallbackThread.getInstance().toStop();


// destory executor-server
stopRpcProvider();


// destory invoker
stopInvokerFactory();
}




// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private static Serializer serializer;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {


String addressUrl = address.concat(AdminBiz.MAPPING);


AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
serializer,
CallType.SYNC,
LoadBalance.ROUND,
AdminBiz.class,
null,
3000,
addressUrl,
accessToken,
null,
null
).getObject();


if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
private void stopInvokerFactory(){
// stop invoker factory
try {
XxlRpcInvokerFactory.getInstance().stop();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
public static Serializer getSerializer() {
return serializer;
}




// ---------------------- executor-server (rpc provider) ----------------------
private XxlRpcProviderFactory xxlRpcProviderFactory = null;


private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {


// init, provider factory
// 初始化地址,如果端口为空,默认9999为jetty的服务器端口
String address = IpUtil.getIpPort(ip, port);
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);


xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);


// add services
// 创建一个ExecutorService 实例,放入Map中,后面会通过class获取到他的实例执行run方法
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());


// start
// jetty + registry
xxlRpcProviderFactory.start();


}


public static class ExecutorServiceRegistry extends ServiceRegistry {


@Override
public void start(Map<String, String> param) {
// start registry
ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
}
@Override
public void stop() {
// stop registry
ExecutorRegistryThread.getInstance().toStop();
}


@Override
public boolean registry(Set<String> keys, String value) {
return false;
}
@Override
public boolean remove(Set<String> keys, String value) {
return false;
}
@Override
public Map<String, TreeSet<String>> discovery(Set<String> keys) {
return null;
}
@Override
public TreeSet<String> discovery(String key) {
return null;
}


}


private void stopRpcProvider() {
// stop provider factory
try {
xxlRpcProviderFactory.stop();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}




// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}




// ---------------------- job thread repository ----------------------
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});


JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}


return newJobThread;
}
public static void removeJobThread(int jobId, String removeOldReason){
JobThread oldJobThread = jobThreadRepository.remove(jobId);
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
}
public static JobThread loadJobThread(int jobId){
JobThread jobThread = jobThreadRepository.get(jobId);
return jobThread;
}


}


6.3客户端使用

6.3.1引入依赖

<!-- xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.1.0</version>
</dependency>

6.3.2配置

### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin


### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample


### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=


### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999


### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=


### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler


### 执行器日志保存天数 [选填] :值大于3时生效,启用执行器Log文件定期清理功能,否则不生效;
xxl.job.executor.logretentiondays=-1

6.3.3配置类

/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);


@Value("${xxl.job.admin.addresses}")
private String adminAddresses;


@Value("${xxl.job.executor.appname}")
private String appName;


@Value("${xxl.job.executor.ip}")
private String ip;


@Value("${xxl.job.executor.port}")
private int port;


@Value("${xxl.job.accessToken}")
private String accessToken;


@Value("${xxl.job.executor.logpath}")
private String logPath;


@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;




@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);


return xxlJobSpringExecutor;
}


/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}

6.3.4执行器类

/**
* 任务Handler示例(Bean模式)
*
* 开发步骤:
* 1、继承"IJobHandler":“com.xxl.job.core.handler.IJobHandler”;
* 2、注册到Spring容器:添加“@Component”注解,被Spring容器扫描为Bean实例;
* 3、注册到执行器工厂:添加“@JobHandler(value="自定义jobhandler名称")”注解,注解value值对应的是调度中心新建任务的JobHandler属性的值。
* 4、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;
*
* @author xuxueli 2015-12-19 19:43:36
*/
@JobHandler(value="demoJobHandler")
@Component
public class DemoJobHandler extends IJobHandler {


@Override
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");


for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return SUCCESS;
}


}


文章转载自AweSomeBaZinGa,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论