@EnableAsync的javadoc
@EnableAsync可以让Spring启用异步方法执行,就跟在xml中配置<task:*> 效果是一样的。它可以跟@Configuration结合,让整个Spring环境启用基于注解的异步处理:
@Configuration@EnableAsyncpublic class AppConfig {}
比如下面:MyAsyncBean是一个用户自定义的Bean,它里面的方法上添加了@Async注解或者EJB 3.1 的@javax.ejb.Asynchronous注解或者是用户自定义的用annotation()指定的注解,通过以下配置,Spring会自动给这个Bean添加切面,这个对用户是透明的。也就是说,背后是通过AOP技术来实现的,源码后面再进行分析。
@Configurationpublic class AnotherAppConfig {@Beanpublic MyAsyncBean asyncBean() {return new MyAsyncBean();}}
异步方法执行的线程池
默认情况下,Spring会查找一个关联的线程池:要么是一个org.springframework.core.task.TaskExecutor类型的bean,要么是一个类型是 java.util.concurrent.Executor名字是taskExecutor的bean。如果这两个都没找到,Spring会使用org.springframework.core.task.SimpleAsyncTaskExecutor来执行异步方法,SimpleAsyncTaskExecutor实际上并不是线程池,它是为每一个新任务创建一个新的线程。
此外,如果异步方法的返回值是void,那么调用过程中的异常信息是无法返回给调用者的,默认只是记录日志。
自定义线程池和异常处理
要自定义线程池和异常处理,可以实现AsyncConfigurer接口,来提供你自己的Executor和AsyncUncaughtExceptionHandler:
@Configuration@EnableAsyncpublic class AppConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(7);executor.setMaxPoolSize(42);executor.setQueueCapacity(11);executor.setThreadNamePrefix("MyExecutor-");executor.initialize();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return MyAsyncUncaughtExceptionHandler();}}
如果只需要自定义其中的一个,另一个可以直接返回null,Spring会使用默认的设置,如果有自定义的需求请扩展AsyncConfigurerSupport,它实现了AsyncConfigurer接口。
注意:上面的例子中,ThreadPoolTaskExecutor并没有被Spring容器管理,可以在getAsyncExecutor() 上添加@Bean注解让它变成Spring管理的Bean。如果加入到Spring容器,那么就不需要手动调用executor.initialize() 做初始化了,因为在Bean初始化的时候会自动调用这个方法。
XML配置方式
<beans><task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/><task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/><bean id="asyncBean" class="com.foo.MyAsyncBean"/><bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/></beans>
以上基于xml和基于javaconfig的例子是等价的,除了给Executor添加线程名字前缀之外。这是因为<task:executor>并没有暴露这样的属性,这也说明javaconfig的方式可以更全面的进行配置。
生成切面的方式
@EnableAsync注解的mode()这个属性用来控制如何应用切面:如果mode是AdviceMode.PROXY(默认),其他的几个属性会一起来控制如何进行代理,请注意,代理只可以拦截在代理对象上的方法调用,在目标对象内部的方法调用是无法被拦截的。如果mode是AdviceMode.ASPECTJ,proxyTargetClass这个属性会被忽略,同时要注意此时classpath中必须要包含spring-aspects相关模块的jar包,此时就不存在代理了,方法内部调用也会被拦截。
@Aysnc的javadoc文档
用来标记一个方法是异步执行的,它可以加在方法上也可以加在类上,如果是加在类上,说明类里面所有的方法都是异步执行的。
目标方法可以有任意的参数,但是返回值只能要么是void,要么是Future,如果是Future,也可以返回它的子类ListenableFuture或者CompletableFuture,这样就可以跟异步任务更好的进行交互,如果不是future是无法获取返回值的。
看一下EnableAsync的源码
@EnableAsync
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(AsyncConfigurationSelector.class)public @interface EnableAsync {}
接着看AsyncConfigurationSelector:
public class AsyncConfigurationSelector extendsAdviceModeImportSelector<EnableAsync> {}public abstract class AdviceModeImportSelector<A extends Annotation>implements ImportSelector{}
显然,AsyncConfigurationSelector是一个ImportSelector,Spring会回调:AdviceModeImportSelector#selectImports(),最终会回调到:AsyncConfigurationSelector#selectImports():
public String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {case PROXY:return new String[] { ProxyAsyncConfiguration.class.getName() };case ASPECTJ:return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };default:return null;}}
我们只看PROXY的情况,此时会向容器注入ProxyAsyncConfiguration这个配置类,看下它的定义:
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {//定义了一个叫做“org.springframework.context.annotation.internalAsyncAnnotationProcessor”的Bean@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");//bean的实际的类型是AsyncAnnotationBeanPostProcessorAsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();//获取@EnableAsync里面的annotation,默认实际@Async,可以自定义//enableAsync的值是父类AbstractAsyncConfiguration进行获取的Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}// executor和exceptionHandler的值也是父类AbstractAsyncConfiguration进行获取的if (this.executor != null) {bpp.setExecutor(this.executor);}if (this.exceptionHandler != null) {bpp.setExceptionHandler(this.exceptionHandler);}bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}}
看下它的父类AbstractAsyncConfiguration:
@Configurationpublic abstract class AbstractAsyncConfiguration implements ImportAware {@Nullableprotected AnnotationAttributes enableAsync;@Nullableprotected Executor executor;@Nullableprotected AsyncUncaughtExceptionHandler exceptionHandler;@Overridepublic void setImportMetadata(AnnotationMetadata importMetadata) {this.enableAsync = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));if (this.enableAsync == null) {throw new IllegalArgumentException("@EnableAsync is not present on importing class " + importMetadata.getClassName());}}@Autowired(required = false)void setConfigurers(Collection<AsyncConfigurer> configurers) {if (CollectionUtils.isEmpty(configurers)) {return;}if (configurers.size() > 1) {throw new IllegalStateException("Only one AsyncConfigurer may exist");}AsyncConfigurer configurer = configurers.iterator().next();this.executor = configurer.getAsyncExecutor();this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();}}
父类主要是用来获取子类中用的那三个变量,注意setConfigurers()上添加了@Autowired注解,可以注入Spring容器中的所有的AsyncConfigurer类型的Bean,正如javadoc中所说的那样。
继续看下AsyncAnnotationBeanPostProcessor:
public class AsyncAnnotationBeanPostProcessor extendsAbstractBeanFactoryAwareAdvisingPostProcessor{}public abstract class AbstractBeanFactoryAwareAdvisingPostProcessorextends AbstractAdvisingBeanPostProcessor implements BeanFactoryAware {}public abstract class AbstractAdvisingBeanPostProcessorextends ProxyProcessorSupport implements BeanPostProcessor{}
如果我们没有自定义AsyncConfigurer,那么我们来看下javadco中说的Executor和ExceptionHandler是如何赋值的。AsyncAnnotationBeanPostProcessor实现了BeanFactoryAware,因此容器会回调:
@Overridepublic void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);//默认this.executor, this.exceptionHandler都是nullAsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);if (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}advisor.setBeanFactory(beanFactory);this.advisor = advisor;}
这里就是在创建切点和切面,继续看AsyncAnnotationAdvisor:
@SuppressWarnings("unchecked")public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);//默认的异步方法的注解是@AsyncasyncAnnotationTypes.add(Async.class);try {asyncAnnotationTypes.add((Class<? extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}//这里创建advice和pointcutthis.advice = buildAdvice(executor, exceptionHandler);this.pointcut = buildPointcut(asyncAnnotationTypes);}
protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor, exceptionHandler);return interceptor;}
实际在执行异步方法的时候,首先要经过AnnotationAsyncExecutionInterceptor这个拦截器:
public class AnnotationAsyncExecutionInterceptorextends AsyncExecutionInterceptor{}public class AsyncExecutionInterceptorextends AsyncExecutionAspectSupportimplements MethodInterceptor, Ordered{}public abstract class AsyncExecutionAspectSupportimplements BeanFactoryAware {}
先看下构造函数,最终是到了AsyncExecutionAspectSupport里面:
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);}
这里面创建了默认的Executor和exceptionHandler,继续看下AsyncExecutionInterceptor#getDefaultExecutor():
@Override@Nullableprotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {Executor defaultExecutor = super.getDefaultExecutor(beanFactory);return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());}
如果父类没有获取到,就创建一个SimpleAsyncTaskExecutor。看一下super.getDefaultExecutor(),也就是AsyncExecutionAspectSupport#getDefaultExecutor():
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {//首先是从容器中找TaskExecutor类型的Beanreturn beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean", ex);try {//然后是找类型是Executor,名字是taskExecutor的Beanreturn beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {。。。}}catch (NoSuchBeanDefinitionException ex) {try {//然后是找类型是Executor,名字是taskExecutor的Beanreturn beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {。。。}}return null;}
实际执行异步方法的时候,看下AsyncExecutionInterceptor的invoke:
public Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);//这里来确定要使用的executorAsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}Callable<Object> task = () -> {try {Object result = invocation.proceed();//看这里的返回值,只有是Future才可以返回值if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {//这里面会使用到exceptionHandlerhandleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};//这里看下返回值的问题return doSubmit(task, executor, invocation.getMethod().getReturnType());}
看下是如何来确定要使用的executor的:
@Nullableprotected AsyncTaskExecutor determineAsyncExecutor(Method method) {AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;//这里是获取@Async注解里面的valueString qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {//如果获取到值,则使用这个qualifier来查找ExecutortargetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {targetExecutor = this.defaultExecutor.get();}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;}
再看下异步方法的返回值的处理:
@Nullableprotected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}else if (ListenableFuture.class.isAssignableFrom(returnType)) {return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}else if (Future.class.isAssignableFrom(returnType)) {return executor.submit(task);}else { // 如果不是future是无法获取到方法的返回值的executor.submit(task);return null;}}
测试代码请参考:https://github.com/xjs1919/enumdemo 下面的 async-demo。




