Spring 异步在实现 AsyncConfigurer 时不起作用

Spring async doesn't work when implements AsyncConfigurer

具有异步方法的 Spring 配置 class 作为:

@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration {

@Autowired
private ApplicationContext applicationContext;

@Bean
public ActivityMessageListener activityMessageListener() {
    return new ActivityMessageListener();
}
@Bean
public TaskExecutor defaultExecutor()
{
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(10);
    threadPoolTaskExecutor.setMaxPoolSize(10);
    threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);

    return threadPoolTaskExecutor;
}

我所有的 @Async 方法都按预期工作,但是如果我将 AsyncConfigurer 实现到 AsyncConfiguration 中以捕获实现 getAsyncUncaughtExceptionHandler() 方法的异常,我的 bean 不会被代理所以方法 @Async 不会 运行 在池执行器中。

这是无效配置:

@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {

@Autowired
private ApplicationContext applicationContext;

@Bean
public ActivityMessageListener activityMessageListener() {
    return new ActivityMessageListener();
}

@Override
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(10);
    threadPoolTaskExecutor.setMaxPoolSize(10);
    threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);

    return threadPoolTaskExecutor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler()        {
    return new SimpleAsyncUncaughtExceptionHandler();
}

可能会发生什么?

我们这样使用 @Async

public class ActivityMessageListener extends BaseMessageListener {

public static final String PARAM_USER_ID                = "userId";
public static final String PARAM_COMPANY_ID             = "companyId";
public static final String PARAM_CREATE_DATE            = "createDate";
public static final String PARAM_CLASS_NAME             = "className";
public static final String PARAM_CLASS_PK               = "classPK";
public static final String PARAM_TYPE                   = "type";
public static final String PARAM_EXTRA_DATA             = "extraData";
public static final String PARAM_RECEIVED_USER_ID       = "receiverUserId";

@Override @Async(value = "defaultExecutor")
public Future<String> doReceive(Message message) throws Exception {

    String name = Thread.currentThread().getName();
    Map<String, Object> parameters  = message.getValues();
    Long userId                     = (Long)parameters.get(ActivityMessageListener.PARAM_USER_ID);
    Long companyId                  = (Long)parameters.get(ActivityMessageListener.PARAM_COMPANY_ID);
    Date createDate                 = (Date)parameters.get(ActivityMessageListener.PARAM_CREATE_DATE);
    String className                = (String)parameters.get(ActivityMessageListener.PARAM_CLASS_NAME);
    Long classPK                    = (Long)parameters.get(ActivityMessageListener.PARAM_CLASS_PK);
    Integer type                    = (Integer)parameters.get(ActivityMessageListener.PARAM_TYPE);
    String extraData                = (String)parameters.get(ActivityMessageListener.PARAM_EXTRA_DATA);
    Long receiverUserId             = (Long)parameters.get(ActivityMessageListener.PARAM_RECEIVED_USER_ID);
    ActivityLocalServiceUtil.addActivity(userId, companyId, createDate, className, classPK, type, extraData, receiverUserId);

    return new AsyncResult<String>(name);
}
}

编辑: 我已经提交了 bug report (SPR-14630).


我正要向 Spring 的 issue tracker 提交错误报告,但是当我准备一个小应用程序来重现错误时,我发现并修复了问题。

首先,在使用ThreadPoolTaskExecutor的时候,在返回之前应该先调用它的initialize()方法:

@Override
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(1);
    executor.setCorePoolSize(1);
    executor.setThreadNamePrefix("CUSTOM-");

    // Initialize the executor
    executor.initialize();

    return executor;
}

同样出于某种原因,如果我在同一配置 class 中定义的 @PostConstruct 方法中使用 bean,它不会 运行 异步。原因是@PostConstruct方法先于getAsyncExecutor()getAsyncUncaughtExceptionHandler()执行:

AsyncBean.java:

@Component
public class AsyncBean implements IAsyncBean {

    @Override
    @Async
    public void whoAmI() {
        final String message =
                String.format("My name is %s and I am running in %s", getClass().getSimpleName(), Thread.currentThread());

        System.out.println(message);
    }
}

AsyncDemoApp.java:

@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer {

    @Autowired
    private IAsyncBean asyncBean;

    public static void main(String[] args) {
        SpringApplication.run(AsyncDemoApp.class, args);
    }

    @Override
    public Executor getAsyncExecutor() {
        System.out.println("AsyncDemoApp.getAsyncExecutor");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("CUSTOM-");
        executor.initialize();

        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
        return (throwable, method, objects)
                -> throwable.printStackTrace();
    }

    @PostConstruct
    public void start() {
        System.out.println("AsyncDemoApp.start");
        asyncBean.whoAmI();
    }
}

输出:

AsyncDemoApp.start
My name is AsyncBean and I am running in Thread[main,5,main]
AsyncDemoApp.getAsyncExecutor
AsyncDemoApp.getAsyncUncaughtExceptionHandler

但是,如果您在应用程序上下文准备就绪后使用 bean,它应该会按预期工作:

@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer {

    public static void main(String[] args) {
        final ConfigurableApplicationContext context = SpringApplication.run(AsyncDemoApp.class, args);

        final IAsyncBean asyncBean = context.getBean(IAsyncBean.class);

        asyncBean.whoAmI();
    }

    @Override
    public Executor getAsyncExecutor() {
        System.out.println("AsyncDemoApp.getAsyncExecutor");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("CUSTOM-");
        executor.initialize();

        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
        return (throwable, method, objects)
                -> throwable.printStackTrace();
    }
}

另一个奇怪的行为是,如果您 autowire 相同配置中的异步 bean class,自动连接发生在配置自定义异步执行器之前,因此bean 不会 运行 异步,它 运行 在主线程中。这可以通过将 @PostConstruct 添加到 AsyncBean 并使用 CommandLineRunner 到 运行 应用程序来验证(我个人认为这是一个错误。行为非常令人惊讶地说最少):

AsyncBean@PostConstruct:

@Component
public class AsyncBean implements IAsyncBean {

    @Override
    @Async
    public void whoAmI() {
        final String message =
                String.format("My name is %s and I am running in %s", getClass().getSimpleName(), Thread.currentThread());

        System.out.println(message);
    }

    @PostConstruct
    public void postConstruct() {
        System.out.println("AsyncBean is constructed");
    }
}

AsyncDemoApp 实施 CommandLineRunner:

@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer, CommandLineRunner {

    @Autowired
    private IAsyncBean asyncBean;

    public static void main(String[] args) {
        SpringApplication.run(AsyncDemoApp.class, args);
    }

    @Override
    public Executor getAsyncExecutor() {
        System.out.println("AsyncDemoApp.getAsyncExecutor");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("CUSTOM-");
        executor.initialize();

        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
        return (throwable, method, objects)
                -> throwable.printStackTrace();
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("AsyncDemoApp.run");
        asyncBean.whoAmI();
    }
}

输出:

AsyncBean is constructed
AsyncDemoApp.getAsyncExecutor
AsyncDemoApp.getAsyncUncaughtExceptionHandler
AsyncDemoApp.run
My name is AsyncBean and I am running in Thread[main,5,main]

还有一件事! :) 如果您使用 ThreadPoolTaskExecutor,根据您的要求,您可能需要将其守护程序 属性 设置为 true,否则您的应用程序将永远保持 运行ning(这不是一个大问题Web/Worker 应用程序的问题)。 setDaemon(boolean) 的 JavaDoc 是这样说的:

Set whether this factory is supposed to create daemon threads, just executing as long as the application itself is running. Default is "false": Concrete factories usually support explicit cancelling. Hence, if the application shuts down, Runnables will by default finish their execution. Specify "true" for eager shutdown of threads which still actively execute a Runnable at the time that the application itself shuts down.