大家好,本文给大家介绍一下Elastic-Job 是如何通过启动内置定时器来定时调解分布式场景下作业异常状态,在分布式场景下有些异常状态不太好直接处理使用定时器可以对作业做一些补偿型的健康诊断,保证作业在分布式环境下健康状态
Elastic-Job 为作业开启健康诊断定时器
文 | 宋小生
6.6.9 诊断服务
调解分布式作业不一致状态的服务,在分布式场景下有些业务不太容易直接处理,可以使用一个定时器定期检查节点的健康状态来定期去维护作业的健康状态。
接下来我们继续看初始化方法中是如何启动定时器来调解服务的,这个启动的方式与其他代码初始化不太一样,先判断了是否未运行,如果未运行则调用诊断服务的startAsync来启动内部定时器,如下所示。
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
这个ReconcileService 类通过继承guava包中的抽象定时器AbstractScheduledService来实现定时器来定时诊断服务异常状态
public final class ReconcileService extends AbstractScheduledService
先来了解下guava包中的AbstractScheduledService类型,此类用于在运行时处理一些周期性的任务。我们先来看下AbstractScheduledService类型提供了哪些方法:
| 方法 | .............. | 说明 |
|---|---|---|
| runOneIteration() | 运行计划任务的一个迭代。如果此方法的任何调用引发异常,则服务将转换为service.State.FAILED状态,并且不再调用此方法,我们需要写的作业执行业务一般在这个方法里面 | |
| startUp() | 启动服务,定时器启动的时候会调用这方法,可以在作业执行之前做一些初始化操作 | |
| shutDown() | 停止服务。这保证不会与runOneIteration同时运行,在作业停止运行时候执行可以重写此方法实现一些关闭逻辑 | |
| scheduler() | 返回用于配置此服务的AbstractScheduledService.Scheduler对象。此方法将只调用一次。Scheduler对象在创建的时候可以通过创建不同的Scheduler对象来决定以不同的频率执行定时任务 | |
| executor() | 用于创建执行器对象,返回的对象将用于执行startUp、runOneIteration和shutDown方法的ScheduledExecutorService。如果重写此方法,则当此服务终止或失败时,不会关闭执行器。子类可以重写此方法以提供自定义ScheduledExecutorService实例。此方法保证只调用一次。默认情况下,此方法返回一个新的ScheduledExecutorService,其中包含一个单线程线程池,该线程池将线程的名称设置为服务名称。此外,当服务终止或失败时,池将关闭 | |
| isRunning() | 返回运行状态 | |
| state() | 返回定时任务的生命周期状态,这个是个枚举类型列举到的状态如下:NEW,STARTING,RUNNING,STOPPING,TERMINATED,FAILED | |
| addListener(Listener listener, Executor executor) | 为作业执行添加监听器,在作业每个执行的生命周期状态变更的时候会调用监听器中的方法 | |
| startAsync() | 启动定时器方法,如果服务状态为service.state.NEW,则会启动服务并立即返回。停止的服务不能重新启动。 | |
| stopAsync() | 如果服务正在启动或运行,则会启动服务关闭并立即返回。如果服务是新的,则在未启动或停止的情况下终止服务。如果服务已经停止,此方法将立即返回而不采取任何操作 | |
| awaitRunning() | 等待服务达到运行状态 | |
| awaitTerminated() | 等待服务达到终止状态 |
当外部调用startAsync() 方法时候,子类可以实现 runOneIteration()方法定义一个周期执行的任务。
通过重写scheduler()方法来设置定时频率,这里设置定时器执行频率的方法如下:
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}
fixedDelay它的间隔时间是根据上次的任务结束的时候开始计时的。
另外还有个模式为fixedRate则是按其规定的时间来执行的,不会考虑作业上次什么时候结束,犹如时刻表一般。
这里设置了首次执行延迟时间为0,间隔1分钟执行一次, 好了我们来看下定时器在执行的时候会触发到的核心执行方法:
@Override
protected void runOneIteration() throws Exception {
//从Zookeeper上读取作业配置
LiteJobConfiguration config = configService.load(true);
//读取用户配置的诊断时间间隔
int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
//超过诊断间隔周期则执行
if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
lastReconcileTime = System.currentTimeMillis();
//当前是主节点,需要进行分片,但是分片标记不存在
if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
shardingService.setReshardingFlag();
}
}
}
主要来说一下主要的业务处理:
先来看下当前处理业务需要满足的3个条件如下:
先判断当前节点是否是主节点.如果主节点正在选举中而导致取不到主节点, 则阻塞至主节点选举完成再返回,这里需要当前节点是主节点。
并且当前没有分片标记。
并且在已经分片的所有分片中,发现有实例进程已经与Zookeeper断开(临时实例节点instance已经被移除)。
用一句话来说明上述条件就是:当前是主节点,需要进行分片,但是分片标记不存在就设置分片标记,在作业执行之前让主节点来执行分片逻辑。其实在前面我们了解过作业实例节点移除的监听器,当有实例节点被移除(一般是与Zookeeper断开连接产生的)这种情况下便会重新设置分片标记,不过在分布式场景下作业节点的监听是与Zookeeper网络交互触发的,如果由于进程重启或者网络异常导致无法通过监听器设置分片标记这里定时器便可以进行补偿操作,定时检测然后设置分片标记。
当然这个定时器后期的扩展也可以帮助检测一下作业节点的健康状态辅助调度作业更健康的运行。
- END -




