暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Elastic-Job2.1.5源码-作业执行线程池的配置与执行异常作业问题分析技巧

中间件源码 2021-09-06
1563

大家好,本文给大家介绍一下Elastic-Job 中创建线程池来执行作业,并同时使用异常作业例子和使用JDK工具分析异常执行作业的堆栈,来排查在测试或者产线环境出现的问题



作业执行线程配置与执行异常作业问题分析技巧

文 | 宋小生



7.6 作业执行线程

7.6.1 作业执行方法

调度作业调用模版AbstractElasticJobExecutor类型中的execute方法进行作业的执行,一共分为3个步骤:

  • 作业执行之前的处理。

  • 作业业务方法调用。

  • 作业执行之后的处理。

主要看如下如下核心执行代码(...为跳过的代码):

private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        ...
        jobFacade.registerJobBegin(shardingContexts);
        ...
        try {
            process(shardingContexts, executionSource);
        } finally {
            // TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
            jobFacade.registerJobCompleted(shardingContexts);
           ...
        }
    }

作业执行之前做如下操作:
  • 在内存中的作业注册表中设置作业的运行状态为true。

  • 如果幂等配置为true,则写入sharding/分片项/running 节点,代表作业处于执行中的状态(幂等开启下次作业分片执行的时候发现当前分片存在running节点则会错过作业执行)。


作业执行之后做如下操作:
  • 在内存中的作业注册表中设置作业的运行状态为false。

  • 如果幂等配置为false,则移除sharding/分片项/running 节点。


接下来主要看作业执行的处理方法process:

主要分为两大块:

  • 分片总数为1则在当前线程中执行作业逻辑

  • 分片总数大于1则使用线程池在子线程中执行作业

 private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
        //如果当前只有一个分片可执行项则直接调用process然后调用子类作业执行器的process方法来执行业务实现的作业具体信息
        if (1 == items.size()) {
            int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
            JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
            process(shardingContexts, item, jobExecutionEvent);
            return;
        }
        //如果是多个分片项那就需要来并行执行,先创建一个计数器,然后循环遍历分片项,将任务放进线程池中执行,当任务全部执行完毕则计数器减1直到所有任务执行完毕则继续往下执行
        final CountDownLatch latch = new CountDownLatch(items.size());
        for (final int each : items) {
            final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
            if (executorService.isShutdown()) {
                return;
            }
            executorService.submit(new Runnable() {
                @Override
   public void run() {
                    try {
                        process(shardingContexts, each, jobExecutionEvent);
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


单个分片的情况下只在在Quartz的工作线程中执行,多个分片的情况下则使用线程池管理线程,将每个分片的任务提交到线程池中来执行,使用CountDownLatch计数器来计数,待所有作业执行完毕则完成作业的执行,否则一直等待

这里简单介绍一下CountDownLatch的用法:

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

然后我们要详细看下调度作业在多个分片的情况下使用到的线程池executorService对象

7.6.2 作业执行线程池executorService对象

线程池对象executorService是如何创建的呢,其实是在AbstractElasticJobExecutor对象的构造器中初始化的这个成员变量,如下代码:

   protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
        ...
        executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
        ...
   }
    


先来看线程池处理器对象的获取,这里使用反射的模式进行扩展:
  private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
        String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum);
        try {
            Class<?> handlerClass = Class.forName(handlerClassName);
            if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) {
                return handlerClass.newInstance();
            }
            return getDefaultHandler(jobPropertiesEnum, handlerClassName);
        } catch (final ReflectiveOperationException ex) {
            return getDefaultHandler(jobPropertiesEnum, handlerClassName);
        }
    }
    


如果用户需要自行创建线程池就需要实现ExecutorServiceHandler接口,然后在作业配置中通过变量executor_service_handler来配置作业线程处理器的全路径来扩展作业的线程池对象初始化,这里获取到自定义线程处理器配置存在的话则使用反射创建线程处理器对象,如果存在用户自定义配置作业执行线程池处理器类型,则使用用户自定义配置类型,否则使用默认类型,默认的线程池处理器类型为DefaultExecutorServiceHandler,代码如下:
public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler {
    
    @Override
    public ExecutorService createExecutorService(final String jobName) {
        return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
    }
}


接下来就是调用createExecutorService方法创建线程池,这里需要注意的是工作线程的前缀都是以inner-job开头的后面跟着我们的作业名字,使用这个线程名字可以用来帮忙排查问题,先来看创建线程的代码,如下所示:
    new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();


在ExecutorServiceObject类型中初始化的构造器和创建方法代码如下:
  public ExecutorServiceObject(final String namingPattern, final int threadSize) {
        //工作队列
        workQueue = new LinkedBlockingQueue<>();
        //创建线程池
        threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue, 
                new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
        //设置是否允许核心线程超时之后被销毁
        threadPoolExecutor.allowCoreThreadTimeOut(true);
    }
    
    /**
     * 创建线程池服务对象.
     *
     * @return 线程池服务对象
     */

    public ExecutorService createExecutorService() {
        return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
    }


线程池的创建一般我们需要了解几个重要的参数,那接下来我们可以看下默认的线程池创建使用了哪些参数:
参数
默认值
说明
corePoolSize
可用处理器数量*2
核心线程数量
maximumPoolSize可用处理器数量*2最大线程数量,说明最多只能创建处理器*2个线程,针对任务这种IO密集型来说可以高效利用CPU资源
keepAliveTime
5L
空闲线程,5分钟超时与时间单元一起
unit
TimeUnit.MINUTES
超时时间单位,分钟
workQueue
LinkedBlockingQueue
无界队列,每个作业在单台机器上执行的时候一次未执行完毕下次作业是无法执行的,这个受分片大小的影响,一般不会出现队列满的情况

线程池创建完毕之后我们来看下最终使用Guava的并发包对线程池对象的包装的处理

  /**
     * 创建线程池服务对象.
     *
     * @return 线程池服务对象
     */

    public ExecutorService createExecutorService() {
        return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
    }


  • getExitingExecutorService方法:把一个ThreadPoolExecutor实例转成一个应用结束后自动退出的ExecutorService实例。

  •  listeningDecorator:把一个ExecutorService实例转成一个ListeningExecutorService实例,ListeningExecutorService实例的submit()方法返回值是一个ListenableFuture实例,利用Futures工具类可以给这个实例添加callback回调(不过我们并没有用到这个特性)。

到这里线程池创建完成了

7.6.3 使用作业线程堆栈信息来排查异常作业

为什么要了解下作业执行时候的线程呢,因为在我们产线运行作业的时候经常需要排查作业执行问题,比如作业执行时间过长,作业卡住这种问题,通过日志难以排查和发现问题,那怎么办呢,这个时候可以使用jdk自带的jstack命令来帮忙分析线程执行情况,来发现卡住的作业线程

7.6.3.1 模拟异常作业

现在以源码提供的example中elastic-job-example-lite-java为例,创建如下作业:

 private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
        JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob""0/5 * * * * ?"3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
        new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).monitorPort(9888).build(), jobEventConfig).init();
    }


作业名字为javaSimpleJob,作业执行方法如下:
public class JavaSimpleJob implements SimpleJob {
    
    private FooRepository fooRepository = FooRepositoryFactory.getFooRepository();
    
    @Override
    public void execute(final ShardingContext shardingContext) {
        System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
                shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"));
        try {
            //写入休眠代码休眠30分钟
            Thread.sleep(30*60*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
        for (Foo each : data) {
            fooRepository.setCompleted(each.getId());
        }
    }
}


创建了如上作业,作业名字为javaSimpleJob,作业类型为SimpleJob类型作业,作业一共有3个分片,在作业执行的方法中我们添加一行休眠代码 Thread.sleep(30*60*1000);休眠30分钟用来模拟程序代码执行的时候执行了一段执行比较慢的代码(可能是慢查询导致的,也可能是数据量比较大导致的执行比较慢),接下来我们启动进程,我们使用jdk自带的工具来分析作业阻塞的问题
7.6.3.2 查看作业进程PID
接下来执行jps -l命令来查看当前机器上启动的java进程如下:
 ➜  ~ jps -l
59073 org.jetbrains.jps.cmdline.Launcher
58882 org.jetbrains.idea.maven.server.RemoteMavenServer36
59077 com.dangdang.ddframe.job.example.JavaMain
58823
60221 sun.tools.jps.Jps


可以看到当前的作业进程为59077

7.6.3.3 分析当前作业异常线程
接下来执行如下 命令来获取当前作业相关的线程信息:
jstack -l 59077 |grep javaSimpleJob


结果如下:
➜  ~ jstack -l 59077 |grep javaSimpleJob
"inner-job-javaSimpleJob-3" #95 daemon prio=5 os_prio=31 tid=0x00007fa9b51d2000 nid=0x13d03 waiting on condition [0x00007000130f1000]
"inner-job-javaSimpleJob-2" #81 daemon prio=5 os_prio=31 tid=0x00007fa9b3a17800 nid=0x14907 waiting on condition [0x00007000124cd000]
"inner-job-javaSimpleJob-1" #64 daemon prio=5 os_prio=31 tid=0x00007fa9b31ee800 nid=0x8603 waiting on condition [0x00007000119ac000]
"javaSimpleJob_QuartzSchedulerThread" #29 prio=5 os_prio=31 tid=0x00007fa9b5120800 nid=0x6803 in Object.wait() [0x0000700010267000]
"javaSimpleJob_Worker-1" #28 prio=5 os_prio=31 tid=0x00007fa9b5091800 nid=0x9b03 waiting on condition [0x0000700010164000]


在这里我们可以看到作业在执行的时候一共会有5个线程:
  • 3个是inner-job-javaSimpleJob开头的分片执行线程

  • 1个为Quartz的定时器线程,线程名为javaSimpleJob_QuartzSchedulerThread

  • 1个为Quartz创建的工作线程,线程名为javaSimpleJob_Worker-1


接下来可以在最后加上-C 来查看当前线程的详细信息,-C是当前行数的前后n行,具体需要打印多少行可以根据具体代码情况修改:

命令如下:

jstack -l 59077 |grep javaSimpleJob -C 10 --col


结果如下:

➜  ~ jstack -l 59077 |grep javaSimpleJob -C 10 --col
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
 at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
 at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
 - None

"inner-job-javaSimpleJob-3" #95 daemon prio=5 os_prio=31 tid=0x00007fa9b51d2000 nid=0x13d03 waiting on condition [0x00007000130f1000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob.execute(JavaSimpleJob.java:39)
 at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.access$000(AbstractElasticJobExecutor.java:47)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor$1.run(AbstractElasticJobExecutor.java:185)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
 at java.util.concurrent.FutureTask.run(FutureTask.java)
--
--
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
 at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
 at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
 - None

"inner-job-javaSimpleJob-2" #81 daemon prio=5 os_prio=31 tid=0x00007fa9b3a17800 nid=0x14907 waiting on condition [0x00007000124cd000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob.execute(JavaSimpleJob.java:39)
 at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.access$000(AbstractElasticJobExecutor.java:47)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor$1.run(AbstractElasticJobExecutor.java:185)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
 at java.util.concurrent.FutureTask.run(FutureTask.java)
--
--
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
 at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
 at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
 - None

"inner-job-javaSimpleJob-1" #64 daemon prio=5 os_prio=31 tid=0x00007fa9b31ee800 nid=0x8603 waiting on condition [0x00007000119ac000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob.execute(JavaSimpleJob.java:39)
 at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.access$000(AbstractElasticJobExecutor.java:47)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor$1.run(AbstractElasticJobExecutor.java:185)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
 at java.util.concurrent.FutureTask.run(FutureTask.java)
--
--
"Timer-0" #30 daemon prio=5 os_prio=31 tid=0x00007fa9b3155800 nid=0x6903 in Object.wait() [0x000070001036a000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 at java.util.TimerThread.mainLoop(Timer.java:552)
 - locked <0x00000006c0035c20> (a java.util.TaskQueue)
 at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
 - None

"javaSimpleJob_QuartzSchedulerThread" #29 prio=5 os_prio=31 tid=0x00007fa9b5120800 nid=0x6803 in Object.wait() [0x0000700010267000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 at org.quartz.simpl.SimpleThreadPool.blockForAvailableThreads(SimpleThreadPool.java:452)
 - locked <0x00000006c0035e80> (a java.lang.Object)
 at org.quartz.core.QuartzSchedulerThread.run(QuartzSchedulerThread.java:263)

   Locked ownable synchronizers:
 - None

--
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 at org.quartz.simpl.SimpleThreadPool.blockForAvailableThreads(SimpleThreadPool.java:452)
 - locked <0x00000006c0035e80> (a java.lang.Object)
 at org.quartz.core.QuartzSchedulerThread.run(QuartzSchedulerThread.java:263)

   Locked ownable synchronizers:
 - None

"javaSimpleJob_Worker-1" #28 prio=5 os_prio=31 tid=0x00007fa9b5091800 nid=0x9b03 waiting on condition [0x0000700010164000]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0x00000006c007cd48> (a java.util.concurrent.CountDownLatch$Sync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:193)
 at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:150)


在线程前后的堆栈信息中可以看到如下线程信息:
  • javaSimpleJob_QuartzSchedulerThread作业定时器线程:

    • 处于TIMED_WAITING状态,代码总是停留在blockForAvailableThreads等待可用工作线程处

  • javaSimpleJob_Worker-1作业的Quartz工作线程:

    • 处于WAITING状态,代码总是停留在我们了解的AbstractElasticJobExecutor的process执行作业方法中的CountDownLatch的wait方法处等待分片线程执行完成

  • inner-job-javaSimpleJob-3,inner-job-javaSimpleJob-2,inner-job-javaSimpleJob-1这3个作业的分片线程:

    • 处于TIMED_WAITING状态,代码总是停留在JavaSimpleJob的39行处也就是我们写的Thread.sleep(30*60*1000);休眠代码处

通过对异常堆栈的分析可以了解到我们当前作业正在执行并且卡在分片线程中,异常代码在JavaSimpleJob的39行处

- END -



文章转载自中间件源码,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论