Spring 异步在实现 AsyncConfigurer 时不起作用
Spring async doesn't work when implements AsyncConfigurer
具有异步方法的 Spring 配置 class 作为:
@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration {
@Autowired
private ApplicationContext applicationContext;
@Bean
public ActivityMessageListener activityMessageListener() {
return new ActivityMessageListener();
}
@Bean
public TaskExecutor defaultExecutor()
{
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
return threadPoolTaskExecutor;
}
我所有的 @Async
方法都按预期工作,但是如果我将 AsyncConfigurer
实现到 AsyncConfiguration
中以捕获实现 getAsyncUncaughtExceptionHandler()
方法的异常,我的 bean 不会被代理所以方法 @Async
不会 运行 在池执行器中。
这是无效配置:
@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {
@Autowired
private ApplicationContext applicationContext;
@Bean
public ActivityMessageListener activityMessageListener() {
return new ActivityMessageListener();
}
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
return threadPoolTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
可能会发生什么?
我们这样使用 @Async
:
public class ActivityMessageListener extends BaseMessageListener {
public static final String PARAM_USER_ID = "userId";
public static final String PARAM_COMPANY_ID = "companyId";
public static final String PARAM_CREATE_DATE = "createDate";
public static final String PARAM_CLASS_NAME = "className";
public static final String PARAM_CLASS_PK = "classPK";
public static final String PARAM_TYPE = "type";
public static final String PARAM_EXTRA_DATA = "extraData";
public static final String PARAM_RECEIVED_USER_ID = "receiverUserId";
@Override @Async(value = "defaultExecutor")
public Future<String> doReceive(Message message) throws Exception {
String name = Thread.currentThread().getName();
Map<String, Object> parameters = message.getValues();
Long userId = (Long)parameters.get(ActivityMessageListener.PARAM_USER_ID);
Long companyId = (Long)parameters.get(ActivityMessageListener.PARAM_COMPANY_ID);
Date createDate = (Date)parameters.get(ActivityMessageListener.PARAM_CREATE_DATE);
String className = (String)parameters.get(ActivityMessageListener.PARAM_CLASS_NAME);
Long classPK = (Long)parameters.get(ActivityMessageListener.PARAM_CLASS_PK);
Integer type = (Integer)parameters.get(ActivityMessageListener.PARAM_TYPE);
String extraData = (String)parameters.get(ActivityMessageListener.PARAM_EXTRA_DATA);
Long receiverUserId = (Long)parameters.get(ActivityMessageListener.PARAM_RECEIVED_USER_ID);
ActivityLocalServiceUtil.addActivity(userId, companyId, createDate, className, classPK, type, extraData, receiverUserId);
return new AsyncResult<String>(name);
}
}
编辑: 我已经提交了 bug report (SPR-14630).
我正要向 Spring 的 issue tracker 提交错误报告,但是当我准备一个小应用程序来重现错误时,我发现并修复了问题。
首先,在使用ThreadPoolTaskExecutor
的时候,在返回之前应该先调用它的initialize()
方法:
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(1);
executor.setCorePoolSize(1);
executor.setThreadNamePrefix("CUSTOM-");
// Initialize the executor
executor.initialize();
return executor;
}
同样出于某种原因,如果我在同一配置 class 中定义的 @PostConstruct
方法中使用 bean,它不会 运行 异步。原因是@PostConstruct
方法先于getAsyncExecutor()
和getAsyncUncaughtExceptionHandler()
执行:
AsyncBean.java
:
@Component
public class AsyncBean implements IAsyncBean {
@Override
@Async
public void whoAmI() {
final String message =
String.format("My name is %s and I am running in %s", getClass().getSimpleName(), Thread.currentThread());
System.out.println(message);
}
}
AsyncDemoApp.java
:
@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer {
@Autowired
private IAsyncBean asyncBean;
public static void main(String[] args) {
SpringApplication.run(AsyncDemoApp.class, args);
}
@Override
public Executor getAsyncExecutor() {
System.out.println("AsyncDemoApp.getAsyncExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("CUSTOM-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
return (throwable, method, objects)
-> throwable.printStackTrace();
}
@PostConstruct
public void start() {
System.out.println("AsyncDemoApp.start");
asyncBean.whoAmI();
}
}
输出:
AsyncDemoApp.start
My name is AsyncBean and I am running in Thread[main,5,main]
AsyncDemoApp.getAsyncExecutor
AsyncDemoApp.getAsyncUncaughtExceptionHandler
但是,如果您在应用程序上下文准备就绪后使用 bean,它应该会按预期工作:
@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer {
public static void main(String[] args) {
final ConfigurableApplicationContext context = SpringApplication.run(AsyncDemoApp.class, args);
final IAsyncBean asyncBean = context.getBean(IAsyncBean.class);
asyncBean.whoAmI();
}
@Override
public Executor getAsyncExecutor() {
System.out.println("AsyncDemoApp.getAsyncExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("CUSTOM-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
return (throwable, method, objects)
-> throwable.printStackTrace();
}
}
另一个奇怪的行为是,如果您 autowire 相同配置中的异步 bean class,自动连接发生在配置自定义异步执行器之前,因此bean 不会 运行 异步,它 运行 在主线程中。这可以通过将 @PostConstruct
添加到 AsyncBean
并使用 CommandLineRunner
到 运行 应用程序来验证(我个人认为这是一个错误。行为非常令人惊讶地说最少):
AsyncBean
与 @PostConstruct
:
@Component
public class AsyncBean implements IAsyncBean {
@Override
@Async
public void whoAmI() {
final String message =
String.format("My name is %s and I am running in %s", getClass().getSimpleName(), Thread.currentThread());
System.out.println(message);
}
@PostConstruct
public void postConstruct() {
System.out.println("AsyncBean is constructed");
}
}
AsyncDemoApp
实施 CommandLineRunner
:
@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer, CommandLineRunner {
@Autowired
private IAsyncBean asyncBean;
public static void main(String[] args) {
SpringApplication.run(AsyncDemoApp.class, args);
}
@Override
public Executor getAsyncExecutor() {
System.out.println("AsyncDemoApp.getAsyncExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("CUSTOM-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
return (throwable, method, objects)
-> throwable.printStackTrace();
}
@Override
public void run(String... args) throws Exception {
System.out.println("AsyncDemoApp.run");
asyncBean.whoAmI();
}
}
输出:
AsyncBean is constructed
AsyncDemoApp.getAsyncExecutor
AsyncDemoApp.getAsyncUncaughtExceptionHandler
AsyncDemoApp.run
My name is AsyncBean and I am running in Thread[main,5,main]
还有一件事! :) 如果您使用 ThreadPoolTaskExecutor
,根据您的要求,您可能需要将其守护程序 属性 设置为 true,否则您的应用程序将永远保持 运行ning(这不是一个大问题Web/Worker 应用程序的问题)。 setDaemon(boolean)
的 JavaDoc 是这样说的:
Set whether this factory is supposed to create daemon threads, just
executing as long as the application itself is running. Default is
"false": Concrete factories usually support explicit cancelling.
Hence, if the application shuts down, Runnables will by default finish
their execution. Specify "true" for eager shutdown of threads which
still actively execute a Runnable at the time that the application
itself shuts down.
具有异步方法的 Spring 配置 class 作为:
@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration {
@Autowired
private ApplicationContext applicationContext;
@Bean
public ActivityMessageListener activityMessageListener() {
return new ActivityMessageListener();
}
@Bean
public TaskExecutor defaultExecutor()
{
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
return threadPoolTaskExecutor;
}
我所有的 @Async
方法都按预期工作,但是如果我将 AsyncConfigurer
实现到 AsyncConfiguration
中以捕获实现 getAsyncUncaughtExceptionHandler()
方法的异常,我的 bean 不会被代理所以方法 @Async
不会 运行 在池执行器中。
这是无效配置:
@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {
@Autowired
private ApplicationContext applicationContext;
@Bean
public ActivityMessageListener activityMessageListener() {
return new ActivityMessageListener();
}
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
return threadPoolTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
可能会发生什么?
我们这样使用 @Async
:
public class ActivityMessageListener extends BaseMessageListener {
public static final String PARAM_USER_ID = "userId";
public static final String PARAM_COMPANY_ID = "companyId";
public static final String PARAM_CREATE_DATE = "createDate";
public static final String PARAM_CLASS_NAME = "className";
public static final String PARAM_CLASS_PK = "classPK";
public static final String PARAM_TYPE = "type";
public static final String PARAM_EXTRA_DATA = "extraData";
public static final String PARAM_RECEIVED_USER_ID = "receiverUserId";
@Override @Async(value = "defaultExecutor")
public Future<String> doReceive(Message message) throws Exception {
String name = Thread.currentThread().getName();
Map<String, Object> parameters = message.getValues();
Long userId = (Long)parameters.get(ActivityMessageListener.PARAM_USER_ID);
Long companyId = (Long)parameters.get(ActivityMessageListener.PARAM_COMPANY_ID);
Date createDate = (Date)parameters.get(ActivityMessageListener.PARAM_CREATE_DATE);
String className = (String)parameters.get(ActivityMessageListener.PARAM_CLASS_NAME);
Long classPK = (Long)parameters.get(ActivityMessageListener.PARAM_CLASS_PK);
Integer type = (Integer)parameters.get(ActivityMessageListener.PARAM_TYPE);
String extraData = (String)parameters.get(ActivityMessageListener.PARAM_EXTRA_DATA);
Long receiverUserId = (Long)parameters.get(ActivityMessageListener.PARAM_RECEIVED_USER_ID);
ActivityLocalServiceUtil.addActivity(userId, companyId, createDate, className, classPK, type, extraData, receiverUserId);
return new AsyncResult<String>(name);
}
}
编辑: 我已经提交了 bug report (SPR-14630).
我正要向 Spring 的 issue tracker 提交错误报告,但是当我准备一个小应用程序来重现错误时,我发现并修复了问题。
首先,在使用ThreadPoolTaskExecutor
的时候,在返回之前应该先调用它的initialize()
方法:
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(1);
executor.setCorePoolSize(1);
executor.setThreadNamePrefix("CUSTOM-");
// Initialize the executor
executor.initialize();
return executor;
}
同样出于某种原因,如果我在同一配置 class 中定义的 @PostConstruct
方法中使用 bean,它不会 运行 异步。原因是@PostConstruct
方法先于getAsyncExecutor()
和getAsyncUncaughtExceptionHandler()
执行:
AsyncBean.java
:
@Component
public class AsyncBean implements IAsyncBean {
@Override
@Async
public void whoAmI() {
final String message =
String.format("My name is %s and I am running in %s", getClass().getSimpleName(), Thread.currentThread());
System.out.println(message);
}
}
AsyncDemoApp.java
:
@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer {
@Autowired
private IAsyncBean asyncBean;
public static void main(String[] args) {
SpringApplication.run(AsyncDemoApp.class, args);
}
@Override
public Executor getAsyncExecutor() {
System.out.println("AsyncDemoApp.getAsyncExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("CUSTOM-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
return (throwable, method, objects)
-> throwable.printStackTrace();
}
@PostConstruct
public void start() {
System.out.println("AsyncDemoApp.start");
asyncBean.whoAmI();
}
}
输出:
AsyncDemoApp.start
My name is AsyncBean and I am running in Thread[main,5,main]
AsyncDemoApp.getAsyncExecutor
AsyncDemoApp.getAsyncUncaughtExceptionHandler
但是,如果您在应用程序上下文准备就绪后使用 bean,它应该会按预期工作:
@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer {
public static void main(String[] args) {
final ConfigurableApplicationContext context = SpringApplication.run(AsyncDemoApp.class, args);
final IAsyncBean asyncBean = context.getBean(IAsyncBean.class);
asyncBean.whoAmI();
}
@Override
public Executor getAsyncExecutor() {
System.out.println("AsyncDemoApp.getAsyncExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("CUSTOM-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
return (throwable, method, objects)
-> throwable.printStackTrace();
}
}
另一个奇怪的行为是,如果您 autowire 相同配置中的异步 bean class,自动连接发生在配置自定义异步执行器之前,因此bean 不会 运行 异步,它 运行 在主线程中。这可以通过将 @PostConstruct
添加到 AsyncBean
并使用 CommandLineRunner
到 运行 应用程序来验证(我个人认为这是一个错误。行为非常令人惊讶地说最少):
AsyncBean
与 @PostConstruct
:
@Component
public class AsyncBean implements IAsyncBean {
@Override
@Async
public void whoAmI() {
final String message =
String.format("My name is %s and I am running in %s", getClass().getSimpleName(), Thread.currentThread());
System.out.println(message);
}
@PostConstruct
public void postConstruct() {
System.out.println("AsyncBean is constructed");
}
}
AsyncDemoApp
实施 CommandLineRunner
:
@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer, CommandLineRunner {
@Autowired
private IAsyncBean asyncBean;
public static void main(String[] args) {
SpringApplication.run(AsyncDemoApp.class, args);
}
@Override
public Executor getAsyncExecutor() {
System.out.println("AsyncDemoApp.getAsyncExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("CUSTOM-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
return (throwable, method, objects)
-> throwable.printStackTrace();
}
@Override
public void run(String... args) throws Exception {
System.out.println("AsyncDemoApp.run");
asyncBean.whoAmI();
}
}
输出:
AsyncBean is constructed
AsyncDemoApp.getAsyncExecutor
AsyncDemoApp.getAsyncUncaughtExceptionHandler
AsyncDemoApp.run
My name is AsyncBean and I am running in Thread[main,5,main]
还有一件事! :) 如果您使用 ThreadPoolTaskExecutor
,根据您的要求,您可能需要将其守护程序 属性 设置为 true,否则您的应用程序将永远保持 运行ning(这不是一个大问题Web/Worker 应用程序的问题)。 setDaemon(boolean)
的 JavaDoc 是这样说的:
Set whether this factory is supposed to create daemon threads, just executing as long as the application itself is running. Default is "false": Concrete factories usually support explicit cancelling. Hence, if the application shuts down, Runnables will by default finish their execution. Specify "true" for eager shutdown of threads which still actively execute a Runnable at the time that the application itself shuts down.