池中只有一个线程的 ThreadPoolTaskExecutor 不处理来自 AWS 队列的消息
ThreadPoolTaskExecutor with just one thread on pool not processing messages from AWS queue
我为在应用程序上注册的每个队列创建了一个随需应变的 ChannelAdapter、AsyncTaskExecutor 和一个 Channel。我注意到当 AsyncTaskExecutor 的 maxPoolSize
的数量等于 1 时,消息没有被处理。这就是创建 AsyncTaskExecutor bean 的方式。
static void registerAsyncTaskExecutor(final Consumer consumer, final GenericApplicationContext registry) {
final TaskExecutor executor = consumer.getExecutor();
final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskExecutor.class);
builder.addPropertyValue("corePoolSize", executor.getCorePoolSize());
builder.addPropertyValue("maxPoolSize", executor.getMaxPoolSize());
builder.addPropertyValue("threadNamePrefix", consumer.getName() + "-");
final String beanName = executor.getName();
final BeanDefinition beanDefinition = builder.getBeanDefinition();
registry.registerBeanDefinition(beanName, beanDefinition);
}
我注意到的另一件事是调用此方法时 java.util.concurrent.ThreadPoolExecutor#execute
此条件 workerCountOf(c) < corePoolSize
始终为假。
完整的项目 link 在这里 https://github.com/LeoFuso/spring-integration-aws-demo
为一些可管理的组件提供一个线程池总是不好的做法。您可能不知道该组件将如何处理您的线程池,这确实可能是您的单个线程被内部某些 long-living 任务占用并且所有新任务都将在队列中停止的事实等待单个线程空闲,这可能不会发生。
事实上,这就是我们从 Spring Cloud AWS 中获得的 AsynchronousMessageListener
所拥有的,它被提到的 SqsMessageDrivenChannelAdapter
:
使用
public void run() {
while (isQueueRunning()) {
所以,或者依赖默认的执行器,或者为你自己的执行器提供足够的线程。
看来那边的线程数逻辑是这样的:
int spinningThreads = this.getRegisteredQueues().size();
if (spinningThreads > 0) {
threadPoolTaskExecutor
.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
因此,我们在提供 SQS 队列时拥有确切的线程数,外加 2
工作线程乘数。看起来我们需要一个线程让每个队列进行轮询,并需要额外的线程来处理来自它们的消息。
(不是 Spring 集成问题 - 更像是 Spring Cloud AWS)。
我为在应用程序上注册的每个队列创建了一个随需应变的 ChannelAdapter、AsyncTaskExecutor 和一个 Channel。我注意到当 AsyncTaskExecutor 的 maxPoolSize
的数量等于 1 时,消息没有被处理。这就是创建 AsyncTaskExecutor bean 的方式。
static void registerAsyncTaskExecutor(final Consumer consumer, final GenericApplicationContext registry) {
final TaskExecutor executor = consumer.getExecutor();
final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskExecutor.class);
builder.addPropertyValue("corePoolSize", executor.getCorePoolSize());
builder.addPropertyValue("maxPoolSize", executor.getMaxPoolSize());
builder.addPropertyValue("threadNamePrefix", consumer.getName() + "-");
final String beanName = executor.getName();
final BeanDefinition beanDefinition = builder.getBeanDefinition();
registry.registerBeanDefinition(beanName, beanDefinition);
}
我注意到的另一件事是调用此方法时 java.util.concurrent.ThreadPoolExecutor#execute
此条件 workerCountOf(c) < corePoolSize
始终为假。
完整的项目 link 在这里 https://github.com/LeoFuso/spring-integration-aws-demo
为一些可管理的组件提供一个线程池总是不好的做法。您可能不知道该组件将如何处理您的线程池,这确实可能是您的单个线程被内部某些 long-living 任务占用并且所有新任务都将在队列中停止的事实等待单个线程空闲,这可能不会发生。
事实上,这就是我们从 Spring Cloud AWS 中获得的 AsynchronousMessageListener
所拥有的,它被提到的 SqsMessageDrivenChannelAdapter
:
public void run() {
while (isQueueRunning()) {
所以,或者依赖默认的执行器,或者为你自己的执行器提供足够的线程。
看来那边的线程数逻辑是这样的:
int spinningThreads = this.getRegisteredQueues().size();
if (spinningThreads > 0) {
threadPoolTaskExecutor
.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
因此,我们在提供 SQS 队列时拥有确切的线程数,外加 2
工作线程乘数。看起来我们需要一个线程让每个队列进行轮询,并需要额外的线程来处理来自它们的消息。
(不是 Spring 集成问题 - 更像是 Spring Cloud AWS)。