暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Spring @EnableScheduling 注解解析

摘星族 2018-11-09
440

概述

Spring 的@EnableScheduling
 为我们提供了快速的基于多种规则的任务调度功能。在《Spring 4.x Task 和 Schedule 概述》一文中对Spring 实现的异步任务和定时计划作了概要性的介绍,本文将对其实现原理进行解析。

核心原理

@EnableScheduling

要使用Spring
 的注解@Scheduled
 来快速开启任务调度功能,只需要添加如下配置:

1
2
3
4
@Configuration
@EnableScheduling
public class ScheduleConfig {
}


@EnableScheduling
 注解对应的内容如下:

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}


由上可以看到实际上是SchedulingConfiguration.class
 类实现了Spring
 的任务调度框架级功能。该配置类仅仅是定义了ScheduledAnnotationBeanPostProcessor
 的实例。Spring 的调度功能由该实例进行配置。

ScheduledAnnotationBeanPostProcessor

该类实现的接口如下所示:
ScheduledAnnotationBeanPostProcessor 实现的接口

BeanPostProcessor

BeanPostProcessor
 作为框架级接口,为实现该接口的类提供了对由 Spring 框架进行组装的单实例 Bean 进行处理的功能。其接口为:

1
2
3
4
5
6
public interface BeanPostProcessor {
Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException;
Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException;
}


实际上,Spring
 正是在postProcessAfterInitialization(Object bean, String beanName)
 实现了对拥有@Scheduled
 注解的实例 Bean 的处理。将其方法添加到对应的任务调度类别中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public Object postProcessAfterInitialization(final Object bean, String beanName) {
   Class<?> targetClass = AopUtils.getTargetClass(bean);
   if (!this.nonAnnotatedClasses.contains(targetClass)) {
       Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
               new MethodIntrospector.MetadataLookup<Set<Scheduled>>() {
                   @Override
                   public Set<Scheduled> inspect(Method method) {
                       Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                               method, Scheduled.class, Schedules.class);
                       return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                   }
               });
       if (annotatedMethods.isEmpty()) {
           this.nonAnnotatedClasses.add(targetClass);
           if (logger.isTraceEnabled()) {
               logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
           }
       }
       else {
           // Non-empty set of methods
           for (Map.Entry<Method, Set<Scheduled>> entry : annotatedMethods.entrySet()) {
               Method method = entry.getKey();
               for (Scheduled scheduled : entry.getValue()) {
                   processScheduled(scheduled, method, bean);
               }
           }
           if (logger.isDebugEnabled()) {
               logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                       "': " + annotatedMethods);
           }
       }
   }
   return bean;
}


上述方法调用的processScheduled(Scheduled scheduled, Method method, Object bean)
 实现了对注解@Scheduled
 的内容的解析,并将对应的调度任务类型添加到ScheduledTaskRegistrar
 实例中。

初始化 TaskScheduler

在 Spring 容器启动时,分别会在afterSingletonsInstantiated()
 和 onApplicationEvent(ContextRefreshedEvent event)
 方法中调用ScheduledAnnotationBeanPostProcessor.finishRegistration()

finishRegistration()
 的逻辑如下:

计划任务执行器TaskScheduler
 是否存储,存在就将其传递给 ScheduledTaskRegistrar.setScheduler(Object scheduler)

检查系统中是否存在实现了回调接口SchedulingConfigurer
 的实例 Bean,如果存在则将现有的 ScheduledTaskRegistrar
实例添加到SchedulingConfigurer
 中

1
2
3
4
5
6
7
if (this.beanFactory instanceof ListableBeanFactory) {
   Map<String, SchedulingConfigurer> configurers =
           ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
   for (SchedulingConfigurer configurer : configurers.values()) {
       configurer.configureTasks(this.registrar);
   }
}

如若存在需要调度的任务,同时TaskScheduler
 不存在,则执行分别按类型TaskScheduler.class
 和 ScheduledExecutorService.class
 进行 Bean
 的查找工作

1
2
this.registrar.setTaskScheduler(resolveSchedulerBean(TaskScheduler.class, false));
this.registrar.setScheduler(resolveSchedulerBean(ScheduledExecutorService.class, false));

调用 ScheduledTaskRegistrar.afterPropertiesSet()
,在该方法中,如果之前步骤都没有找到对应的TaskScheduler
 则直接调用Executors.newSingleThreadScheduledExecutor()
 构造

最终使用的任务执行器为ConcurrentTaskScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void afterPropertiesSet() {
   scheduleTasks();
}

/**
* Schedule all registered tasks against the underlying {@linkplain
* #setTaskScheduler(TaskScheduler) task scheduler}.
*/
protected void scheduleTasks() {
   if (this.taskScheduler == null) {
       this.localExecutor = Executors.newSingleThreadScheduledExecutor();
       this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
   }
   if (this.triggerTasks != null) {
       for (TriggerTask task : this.triggerTasks) {
           addScheduledTask(scheduleTriggerTask(task));
       }
   }
   if (this.cronTasks != null) {
       for (CronTask task : this.cronTasks) {
           addScheduledTask(scheduleCronTask(task));
       }
   }
   if (this.fixedRateTasks != null) {
       for (IntervalTask task : this.fixedRateTasks) {
           addScheduledTask(scheduleFixedRateTask(task));
       }
   }
   if (this.fixedDelayTasks != null) {
       for (IntervalTask task : this.fixedDelayTasks) {
           addScheduledTask(scheduleFixedDelayTask(task));
   }
}

由上述TaskScheduler
 实例的初始化过程来看,默认的 Spring 上下文中并不存在实现了TaskScheduler.class
 或 ScheduledExecutorService.class
 接口的 Bean。因此也不能直接@Autowire
。但是我们可以自己定义实现了这些接口的实例,例如ThreadPoolTaskScheduler

自定义TaskScheduler

实现 SchedulingConfigurer 接口自定义

SchedulingConfigurer.configureTasks(ScheduledTaskRegistrar taskRegistrar)
 为我们提供了一个回调。也就是说添加了@Configuration
 注解的配置类上,我们可以实现该接口,对系统中真正处理任务的ScheduledTaskRegistrar
 类进行修改。
同时,该回调接口为我们实现Trigger-based
 的计划任务提供了方法()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
@EnableScheduling
public class ScheduleCallbackConfig implements SchedulingConfigurer {

   @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
   public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
       ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
       // self define error handler
       //threadPoolTaskScheduler.setErrorHandler(null);
       return new ThreadPoolTaskScheduler();
   }

   @Override
   public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
       taskRegistrar.setTaskScheduler(threadPoolTaskScheduler());
       taskRegistrar.addTriggerTask(new TriggerTask(new Runnable() {
           @Override
           public void run() {
               System.out.println("task implements");
           }
       }, new CronTrigger("")));
   }
}

直接注入 Bean

直接注入实现了TaskScheduler.class
 或 ScheduledExecutorService.class
 接口的 Bean 进行自定义。这样启动时,会在 Spring ApplicationContext 上下文查找对应的 Bean。

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
@EnableScheduling
public class ScheduleCallbackConfig {
   @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
   public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
       ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
       threadPoolTaskScheduler.setPoolSize(10);
       // self define error handler
       //threadPoolTaskScheduler.setErrorHandler(null);
       return threadPoolTaskScheduler;
   }
}


其他类

ConcurrentTaskScheduler

该类提供了对java.util.concurrent.ScheduledExecutorService
 的适配,同时它能够自动检测JSR-236 javax.enterprise.concurrent.ManagedScheduledExecutorService
,以用来实现trigger-based
 计划,代替java.util.concurrent.ScheduledExecutorService
 实现的 delay-based
计划任务。


ConcurrentTaskScheduler 类图

DefaultManagedTaskScheduler

JNDI-based,在 Java EE 7 环境, 默认查找 JSR-236的 “java:comp/DefaultManagedScheduledExecutorService”。
DefaultManagedTaskScheduler 类图

ThreadPoolTaskScheduler

实现了 Spring 的 TaskScheduler 接口, 包装了原生的 java.util.concurrent.ScheduledThreadPoolExecutor

ThreadPoolTaskScheduler 类图

分析

默认的 ConcurrentTaskScheduler
 计划执行器采用Executors.newSingleThreadScheduledExecutor()
 实现单线程的执行器。因此,对同一个调度任务的执行总是同一个线程。如果任务的执行时间超过该任务的下一次执行时间,则会出现任务丢失,跳过该段时间的任务。
上述问题有以下解决办法:

  • 采用异步的方式执行调度任务,配置 Spring 的 @EnableAsync
    ,在任务执行的方法上标注 @Async

  • 配置任务执行池,采用 ThreadPoolTaskScheduler.setPoolSize(n)
    。 n
     的数量为 单个任务执行所需时间 / 任务执行的间隔时间

总结

本文详细介绍了 Spring 基于注解实现的计划任务调度功能。并对其实现原理及源代码进行了解析。同时对如何自定义开发进行了说明。由此可见,要理解 Spring 的框架级功能,一定要熟知 Spring Bean 的生命周期。

参考

  1. spring-framework-reference


本文章采用知识共享署名 2.5 中国大陆许可协议进行许可。
欢迎转载,但转载请注明来自张兆玉,并保持转载后文章内容的完整。本人保留所有版权相关权利。
本文链接:tramp.cincout.cn/2017/08/18/spring-task-2017-08-18-spring-boot-enablescheduling-analysis/

文章转载自摘星族,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论