将多个 @RabbitListener bean 添加到 ContainerFactory 的优雅方式
Elegant way to add multiple @RabbitListener beans to a ContainerFactory
这是我的@Configuration
@Bean
public AmqpAdmin amqpAdmin()
{
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
DirectExchange dirExchange = new DirectExchange("evtExchange", true,
false);
rabbitAdmin.declareExchange(dirExchange);
rabbitAdmin.declareQueue(processQueue);
Binding processBinding = BindingBuilder.bind(processQueue)
.to(dirExchange).with("rkey.process");
rabbitAdmin.declareBinding(processBinding);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
SimpleMessageListenerContainer container = factory
.createListenerContainer();
factory.setConcurrentConsumers(50);
factory.setMaxConcurrentConsumers(100);
container.setStartConsumerMinInterval(3000);
container.setQueues(processQueue);
factory.setAdviceChain(retryInterceptor());
return factory;
}
@Bean
public RetryOperationsInterceptor retryInterceptor()
{
return RetryInterceptorBuilder.stateless().maxAttempts(5)
.backOffOptions(1000, 2.0, 10000).recoverer(new RejectAndDontRequeueRecoverer()).build();
}
@Bean
public ProcessQueueListener processListener()
{
return new ProcessQueueListener();
}
@Bean
public ProcessQueueListener processListener2()
{
return new ProcessQueueListener();
}
@Bean
public ProcessQueueListener processListener3()
{
return new ProcessQueueListener();
}
这里是 @RabbitListener
class
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "process")
public class ProcessQueueListener
{
public ProcessQueueListener()
{
}
@RabbitHandler
void receiveMessage(String message)
{
// doSomething
}
}
仅当我分别实例化 processListener()
、processListener2()
和 processListener3()
时,我才开始在 RabbitMQ Admin 中看到进程队列的多个消费者,并且每个侦听器都在处理消息,否则,尽管指定了 setConcurrentConsumers()
,但我只看到一个消费者
有没有一种优雅的方法可以按需声明多个监听器,根据需要增减。或者声明多个 @Bean
是唯一的选择?还是我做错了什么?
您使用的是什么版本?
我刚刚复制了你的容器工厂,它对我来说工作正常 (2.1.3)...
顺便说一句,从 2.0 版开始,您可以将 concurrency
添加到 @RabbitListener
,它将覆盖容器工厂中的任何值。
/**
* Set the concurrency of the listener container for this listener. Overrides the
* default set by the listener container factory. Maps to the concurrency setting of
* the container type.
* <p>For a
* {@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
* SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed
* number of consumers in the {@code concurrentConsumers} property. If it is a string
* with the form {@code "m-n"}, the {@code concurrentConsumers} is set to {@code m}
* and the {@code maxConcurrentConsumers} is set to {@code n}.
* <p>For a
* {@link org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer
* DirectMessageListenerContainer} it sets the {@code consumersPerQueue} property.
* @return the concurrency.
* @since 2.0
*/
String concurrency() default "";
此外,不相关,但您不应该在 bean 声明中这样做 rabbitAdmin.declareExchange(dirExchange)
- 在应用程序上下文生命周期中连接到 RabbitMQ 还为时过早。将交换器、队列和绑定添加为 @Bean
s,管理员将自动找到并声明它们。
这是我的@Configuration
@Bean
public AmqpAdmin amqpAdmin()
{
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
DirectExchange dirExchange = new DirectExchange("evtExchange", true,
false);
rabbitAdmin.declareExchange(dirExchange);
rabbitAdmin.declareQueue(processQueue);
Binding processBinding = BindingBuilder.bind(processQueue)
.to(dirExchange).with("rkey.process");
rabbitAdmin.declareBinding(processBinding);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
SimpleMessageListenerContainer container = factory
.createListenerContainer();
factory.setConcurrentConsumers(50);
factory.setMaxConcurrentConsumers(100);
container.setStartConsumerMinInterval(3000);
container.setQueues(processQueue);
factory.setAdviceChain(retryInterceptor());
return factory;
}
@Bean
public RetryOperationsInterceptor retryInterceptor()
{
return RetryInterceptorBuilder.stateless().maxAttempts(5)
.backOffOptions(1000, 2.0, 10000).recoverer(new RejectAndDontRequeueRecoverer()).build();
}
@Bean
public ProcessQueueListener processListener()
{
return new ProcessQueueListener();
}
@Bean
public ProcessQueueListener processListener2()
{
return new ProcessQueueListener();
}
@Bean
public ProcessQueueListener processListener3()
{
return new ProcessQueueListener();
}
这里是 @RabbitListener
class
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "process")
public class ProcessQueueListener
{
public ProcessQueueListener()
{
}
@RabbitHandler
void receiveMessage(String message)
{
// doSomething
}
}
仅当我分别实例化 processListener()
、processListener2()
和 processListener3()
时,我才开始在 RabbitMQ Admin 中看到进程队列的多个消费者,并且每个侦听器都在处理消息,否则,尽管指定了 setConcurrentConsumers()
有没有一种优雅的方法可以按需声明多个监听器,根据需要增减。或者声明多个 @Bean
是唯一的选择?还是我做错了什么?
您使用的是什么版本?
我刚刚复制了你的容器工厂,它对我来说工作正常 (2.1.3)...
顺便说一句,从 2.0 版开始,您可以将 concurrency
添加到 @RabbitListener
,它将覆盖容器工厂中的任何值。
/**
* Set the concurrency of the listener container for this listener. Overrides the
* default set by the listener container factory. Maps to the concurrency setting of
* the container type.
* <p>For a
* {@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
* SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed
* number of consumers in the {@code concurrentConsumers} property. If it is a string
* with the form {@code "m-n"}, the {@code concurrentConsumers} is set to {@code m}
* and the {@code maxConcurrentConsumers} is set to {@code n}.
* <p>For a
* {@link org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer
* DirectMessageListenerContainer} it sets the {@code consumersPerQueue} property.
* @return the concurrency.
* @since 2.0
*/
String concurrency() default "";
此外,不相关,但您不应该在 bean 声明中这样做 rabbitAdmin.declareExchange(dirExchange)
- 在应用程序上下文生命周期中连接到 RabbitMQ 还为时过早。将交换器、队列和绑定添加为 @Bean
s,管理员将自动找到并声明它们。