将多个 @RabbitListener bean 添加到 ContainerFactory 的优雅方式

Elegant way to add multiple @RabbitListener beans to a ContainerFactory

这是我的@Configuration

   @Bean
    public AmqpAdmin amqpAdmin()
    {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());

        DirectExchange dirExchange = new DirectExchange("evtExchange", true,
                false);

        rabbitAdmin.declareExchange(dirExchange);
        rabbitAdmin.declareQueue(processQueue);
        Binding processBinding = BindingBuilder.bind(processQueue)
                .to(dirExchange).with("rkey.process");
        rabbitAdmin.declareBinding(processBinding);

        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        SimpleMessageListenerContainer container = factory
                .createListenerContainer();
        factory.setConcurrentConsumers(50);
        factory.setMaxConcurrentConsumers(100);
        container.setStartConsumerMinInterval(3000);
        container.setQueues(processQueue);
        factory.setAdviceChain(retryInterceptor());
        return factory;
    }

    @Bean
    public RetryOperationsInterceptor retryInterceptor()
    {
        return RetryInterceptorBuilder.stateless().maxAttempts(5)
                .backOffOptions(1000, 2.0, 10000).recoverer(new RejectAndDontRequeueRecoverer()).build();
    }

    @Bean
    public ProcessQueueListener processListener()
    {
        return new ProcessQueueListener();
    }

    @Bean
    public ProcessQueueListener processListener2()
    {
        return new ProcessQueueListener();
    }

    @Bean
    public ProcessQueueListener processListener3()
    {
        return new ProcessQueueListener();
    }

这里是 @RabbitListener class

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "process")
public class ProcessQueueListener
{

    public ProcessQueueListener()
    {
    }

    @RabbitHandler
    void receiveMessage(String message)
    {
        // doSomething
    }

} 

仅当我分别实例化 processListener()processListener2()processListener3() 时,我才开始在 RabbitMQ Admin 中看到进程队列的多个消费者,并且每个侦听器都在处理消息,否则,尽管指定了 setConcurrentConsumers()

,但我只看到一个消费者

有没有一种优雅的方法可以按需声明多个监听器,根据需要增减。或者声明多个 @Bean 是唯一的选择?还是我做错了什么?

您使用的是什么版本?

我刚刚复制了你的容器工厂,它对我来说工作正常 (2.1.3)...

顺便说一句,从 2.0 版开始,您可以将 concurrency 添加到 @RabbitListener,它将覆盖容器工厂中的任何值。

/**
 * Set the concurrency of the listener container for this listener. Overrides the
 * default set by the listener container factory. Maps to the concurrency setting of
 * the container type.
 * <p>For a
 * {@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
 * SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed
 * number of consumers in the {@code concurrentConsumers} property. If it is a string
 * with the form {@code "m-n"}, the {@code concurrentConsumers} is set to {@code m}
 * and the {@code maxConcurrentConsumers} is set to {@code n}.
 * <p>For a
 * {@link org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer
 * DirectMessageListenerContainer} it sets the {@code consumersPerQueue} property.
 * @return the concurrency.
 * @since 2.0
 */
String concurrency() default "";

此外,不相关,但您不应该在 bean 声明中这样做 rabbitAdmin.declareExchange(dirExchange) - 在应用程序上下文生命周期中连接到 RabbitMQ 还为时过早。将交换器、队列和绑定添加为 @Beans,管理员将自动找到并声明它们。