大家好,本文给大家介绍一下Elastic-Job 中创建线程池来执行作业,并同时使用异常作业例子和使用JDK工具分析异常执行作业的堆栈,来排查在测试或者产线环境出现的问题
作业执行线程配置与执行异常作业问题分析技巧
文 | 宋小生
7.6 作业执行线程
7.6.1 作业执行方法
调度作业调用模版AbstractElasticJobExecutor类型中的execute方法进行作业的执行,一共分为3个步骤:
作业执行之前的处理。
作业业务方法调用。
作业执行之后的处理。
主要看如下如下核心执行代码(...为跳过的代码):
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
...
jobFacade.registerJobBegin(shardingContexts);
...
try {
process(shardingContexts, executionSource);
} finally {
// TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
jobFacade.registerJobCompleted(shardingContexts);
...
}
}
在内存中的作业注册表中设置作业的运行状态为true。
如果幂等配置为true,则写入sharding/分片项/running 节点,代表作业处于执行中的状态(幂等开启下次作业分片执行的时候发现当前分片存在running节点则会错过作业执行)。
在内存中的作业注册表中设置作业的运行状态为false。
如果幂等配置为false,则移除sharding/分片项/running 节点。
接下来主要看作业执行的处理方法process:
主要分为两大块:
分片总数为1则在当前线程中执行作业逻辑
分片总数大于1则使用线程池在子线程中执行作业
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
//如果当前只有一个分片可执行项则直接调用process然后调用子类作业执行器的process方法来执行业务实现的作业具体信息
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
//如果是多个分片项那就需要来并行执行,先创建一个计数器,然后循环遍历分片项,将任务放进线程池中执行,当任务全部执行完毕则计数器减1直到所有任务执行完毕则继续往下执行
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
单个分片的情况下只在在Quartz的工作线程中执行,多个分片的情况下则使用线程池管理线程,将每个分片的任务提交到线程池中来执行,使用CountDownLatch计数器来计数,待所有作业执行完毕则完成作业的执行,否则一直等待。
这里简单介绍一下CountDownLatch的用法:
CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
然后我们要详细看下调度作业在多个分片的情况下使用到的线程池executorService对象。
7.6.2 作业执行线程池executorService对象
线程池对象executorService是如何创建的呢,其实是在AbstractElasticJobExecutor对象的构造器中初始化的这个成员变量,如下代码:
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
...
executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
...
}
先来看线程池处理器对象的获取,这里使用反射的模式进行扩展:
private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum);
try {
Class<?> handlerClass = Class.forName(handlerClassName);
if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) {
return handlerClass.newInstance();
}
return getDefaultHandler(jobPropertiesEnum, handlerClassName);
} catch (final ReflectiveOperationException ex) {
return getDefaultHandler(jobPropertiesEnum, handlerClassName);
}
}
public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler {
@Override
public ExecutorService createExecutorService(final String jobName) {
return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
}
}
new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
public ExecutorServiceObject(final String namingPattern, final int threadSize) {
//工作队列
workQueue = new LinkedBlockingQueue<>();
//创建线程池
threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue,
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
//设置是否允许核心线程超时之后被销毁
threadPoolExecutor.allowCoreThreadTimeOut(true);
}
/**
* 创建线程池服务对象.
*
* @return 线程池服务对象
*/
public ExecutorService createExecutorService() {
return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
}
| 参数 | 默认值 | 说明 |
corePoolSize | 可用处理器数量*2 | 核心线程数量 |
| maximumPoolSize | 可用处理器数量*2 | 最大线程数量,说明最多只能创建处理器*2个线程,针对任务这种IO密集型来说可以高效利用CPU资源 |
keepAliveTime | 5L | 空闲线程,5分钟超时与时间单元一起 |
unit | TimeUnit.MINUTES | 超时时间单位,分钟 |
workQueue | LinkedBlockingQueue | 无界队列,每个作业在单台机器上执行的时候一次未执行完毕下次作业是无法执行的,这个受分片大小的影响,一般不会出现队列满的情况 |
线程池创建完毕之后我们来看下最终使用Guava的并发包对线程池对象的包装的处理
/**
* 创建线程池服务对象.
*
* @return 线程池服务对象
*/
public ExecutorService createExecutorService() {
return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
}
getExitingExecutorService方法:把一个ThreadPoolExecutor实例转成一个应用结束后自动退出的ExecutorService实例。
listeningDecorator:把一个ExecutorService实例转成一个ListeningExecutorService实例,ListeningExecutorService实例的submit()方法返回值是一个ListenableFuture实例,利用Futures工具类可以给这个实例添加callback回调(不过我们并没有用到这个特性)。
到这里线程池创建完成了。
7.6.3 使用作业线程堆栈信息来排查异常作业
为什么要了解下作业执行时候的线程呢,因为在我们产线运行作业的时候经常需要排查作业执行问题,比如作业执行时间过长,作业卡住这种问题,通过日志难以排查和发现问题,那怎么办呢,这个时候可以使用jdk自带的jstack命令来帮忙分析线程执行情况,来发现卡住的作业线程。
7.6.3.1 模拟异常作业
现在以源码提供的example中elastic-job-example-lite-java为例,创建如下作业:
private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob", "0/5 * * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).monitorPort(9888).build(), jobEventConfig).init();
}
public class JavaSimpleJob implements SimpleJob {
private FooRepository fooRepository = FooRepositoryFactory.getFooRepository();
@Override
public void execute(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"));
try {
//写入休眠代码休眠30分钟
Thread.sleep(30*60*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
}
}
➜ ~ jps -l
59073 org.jetbrains.jps.cmdline.Launcher
58882 org.jetbrains.idea.maven.server.RemoteMavenServer36
59077 com.dangdang.ddframe.job.example.JavaMain
58823
60221 sun.tools.jps.Jps
jstack -l 59077 |grep javaSimpleJob
➜ ~ jstack -l 59077 |grep javaSimpleJob
"inner-job-javaSimpleJob-3" #95 daemon prio=5 os_prio=31 tid=0x00007fa9b51d2000 nid=0x13d03 waiting on condition [0x00007000130f1000]
"inner-job-javaSimpleJob-2" #81 daemon prio=5 os_prio=31 tid=0x00007fa9b3a17800 nid=0x14907 waiting on condition [0x00007000124cd000]
"inner-job-javaSimpleJob-1" #64 daemon prio=5 os_prio=31 tid=0x00007fa9b31ee800 nid=0x8603 waiting on condition [0x00007000119ac000]
"javaSimpleJob_QuartzSchedulerThread" #29 prio=5 os_prio=31 tid=0x00007fa9b5120800 nid=0x6803 in Object.wait() [0x0000700010267000]
"javaSimpleJob_Worker-1" #28 prio=5 os_prio=31 tid=0x00007fa9b5091800 nid=0x9b03 waiting on condition [0x0000700010164000]
3个是inner-job-javaSimpleJob开头的分片执行线程。
1个为Quartz的定时器线程,线程名为javaSimpleJob_QuartzSchedulerThread。
1个为Quartz创建的工作线程,线程名为javaSimpleJob_Worker-1。
接下来可以在最后加上-C 来查看当前线程的详细信息,-C是当前行数的前后n行,具体需要打印多少行可以根据具体代码情况修改:
命令如下:
jstack -l 59077 |grep javaSimpleJob -C 10 --col
结果如下:
➜ ~ jstack -l 59077 |grep javaSimpleJob -C 10 --col
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"inner-job-javaSimpleJob-3" #95 daemon prio=5 os_prio=31 tid=0x00007fa9b51d2000 nid=0x13d03 waiting on condition [0x00007000130f1000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob.execute(JavaSimpleJob.java:39)
at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.access$000(AbstractElasticJobExecutor.java:47)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor$1.run(AbstractElasticJobExecutor.java:185)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
--
--
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"inner-job-javaSimpleJob-2" #81 daemon prio=5 os_prio=31 tid=0x00007fa9b3a17800 nid=0x14907 waiting on condition [0x00007000124cd000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob.execute(JavaSimpleJob.java:39)
at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.access$000(AbstractElasticJobExecutor.java:47)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor$1.run(AbstractElasticJobExecutor.java:185)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
--
--
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"inner-job-javaSimpleJob-1" #64 daemon prio=5 os_prio=31 tid=0x00007fa9b31ee800 nid=0x8603 waiting on condition [0x00007000119ac000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob.execute(JavaSimpleJob.java:39)
at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.access$000(AbstractElasticJobExecutor.java:47)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor$1.run(AbstractElasticJobExecutor.java:185)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
--
--
"Timer-0" #30 daemon prio=5 os_prio=31 tid=0x00007fa9b3155800 nid=0x6903 in Object.wait() [0x000070001036a000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.util.TimerThread.mainLoop(Timer.java:552)
- locked <0x00000006c0035c20> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:505)
Locked ownable synchronizers:
- None
"javaSimpleJob_QuartzSchedulerThread" #29 prio=5 os_prio=31 tid=0x00007fa9b5120800 nid=0x6803 in Object.wait() [0x0000700010267000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at org.quartz.simpl.SimpleThreadPool.blockForAvailableThreads(SimpleThreadPool.java:452)
- locked <0x00000006c0035e80> (a java.lang.Object)
at org.quartz.core.QuartzSchedulerThread.run(QuartzSchedulerThread.java:263)
Locked ownable synchronizers:
- None
--
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at org.quartz.simpl.SimpleThreadPool.blockForAvailableThreads(SimpleThreadPool.java:452)
- locked <0x00000006c0035e80> (a java.lang.Object)
at org.quartz.core.QuartzSchedulerThread.run(QuartzSchedulerThread.java:263)
Locked ownable synchronizers:
- None
"javaSimpleJob_Worker-1" #28 prio=5 os_prio=31 tid=0x00007fa9b5091800 nid=0x9b03 waiting on condition [0x0000700010164000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c007cd48> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:193)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:150)
javaSimpleJob_QuartzSchedulerThread作业定时器线程:
处于TIMED_WAITING状态,代码总是停留在blockForAvailableThreads等待可用工作线程处。
javaSimpleJob_Worker-1作业的Quartz工作线程:
处于WAITING状态,代码总是停留在我们了解的AbstractElasticJobExecutor的process执行作业方法中的CountDownLatch的wait方法处等待分片线程执行完成。
inner-job-javaSimpleJob-3,inner-job-javaSimpleJob-2,inner-job-javaSimpleJob-1这3个作业的分片线程:
处于TIMED_WAITING状态,代码总是停留在JavaSimpleJob的39行处也就是我们写的Thread.sleep(30*60*1000);休眠代码处。
通过对异常堆栈的分析可以了解到我们当前作业正在执行并且卡在分片线程中,异常代码在JavaSimpleJob的39行处。




