使用 @Async 注释限制线程数并在达到最大线程数时等待

Limit the number of threads and wait if max threads reached using @Async annotation

我正在使用 Spring 的 Java 配置和 AsyncConfigurer:

@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {

@Override
public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(10);
        executor.setThreadNamePrefix("MyExecutor-");
        executor.initialize();
        return executor;
    }
}

现在假设我有一个带有@Async 注释的方法,并且假设它已经被调用了 2 次并且仍然有 2 个线程 运行。据我了解,对它的任何新调用都将添加到容量为 10 的队列中。现在如果我收到第 11 个任务,它的行为会是什么?它会拒绝此处所述的任务:https://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html?还是呼叫者会等待队列槽变空?

我的要求是不执行使用@Async 方法生成的固定数量的线程,并在达到最大线程数时让调用者等待。如果我将 ConcurrentTaskExecutor 与特定大小的固定线程池一起使用,是否可以实现?

根据 ThreadPoolExecutor 的工作原理,第 11 个任务将被拒绝,因为当队列已满时,执行程序会尝试增加池大小,如果由于达到最大值而无法完成,则会拒绝该任务.
您可以找到信息 there in the Spring documentation :

The main idea is that, when a task is submitted, the executor first tries to use a free thread if the number of active threads is currently less than the core size. If the core size has been reached, the task is added to the queue, as long as its capacity has not yet been reached. Only then, if the queue’s capacity has been reached, does the executor create a new thread beyond the core size. If the max size has also been reached, then the executor rejects the task.

关于您的要求:

My Requirement is to not exeed a fix number of threads spawned using @Async method and to make the caller wait if max number of threads are reached. Will this be achieved if I use ConcurrentTaskExecutor with a fixed thread pool of a particular size?

因此增加队列大小并为核心和最大池大小保留相同的值。您还可以使用无界队列,它是队列大小参数的默认值,但要小心,因为如果队列中堆积了太多任务,它可能会导致 OutOfMemoryError

我想限制可能的线程数同时不丢失任何消息。现有的答案没有满足我的这个要求,我找到了另一种方法来做到这一点。因此,将其发布为答案:


我做了一个Executor Bean如下:

@Bean(name = "CustomAsyncExecutor")
public Executor customThreadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(5);
    executor.setQueueCapacity(0);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("Async_Thread_");
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.initialize();
    return executor;
}

然后使用

@Async("CustomAsyncExecutor")
public void methodName(){
....
}

鉴于当线程繁忙和队列已满时,新任务会被拒绝,

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())

帮助我,当我的 5 个线程忙时,我的调用线程将执行任务,因为我的调用线程在异步函数中,它不会接受任何新任务。因此,我不会在不增加队列大小的情况下丢失我的任务。