是否可以在@RabbitListener 上设置预取计数

Is it possible to set prefetch count on @RabbitListener

我知道可以制作 SimpleMessageListenerContainer bean 并在此处设置预取计数和消息侦听器,如下所示:

@Bean
public SimpleMessageListenerContainer messageListenerContainer(
        ConnectionFactory rabbitConnectionFactory,
        Receiver receiver) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory);
    container.setQueueNames("hello");
    container.setMessageListener(new MessageListenerAdapter(receiver, "receive"));
    container.setPrefetchCount(1000);
    return container;
}

但是如果我想使用 @RabbitListener 的声明式方法,如何设置通道的预取计数?

@Component
public class Receiver {

    private static final Logger log = LoggerFactory.getLogger(Receiver.class);

    @RabbitListener(queues = "hello") // how to set prefetch count here?
    public void receive(String message) {
        log.info(" [x] Received '{}'.", message);
    }

}

不可能?

@RabbitListenercontainerFactory 选项:

/**
 * The bean name of the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
 * to use to create the message listener container responsible to serve this endpoint.
 * <p>If not specified, the default container factory is used, if any.
 * @return the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
 * bean name.
 */
String containerFactory() default "";

您可以在其中使用所需的 prefetchCount 配置 SimpleRabbitListenerContainerFactory,该注释的目标 SimpleMessageListenerContainer 将为您提供该选项。

根据@artem-bilan 回答的解决方案:

在某些 @Configuration class:

中声明 RabbitListenerContainerFactory 预取计数为 10 的 bean
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory);
    factory.setPrefetchCount(10);
    return factory;
}

Receiver bean 使用这个工厂 bean:

@Component
public class Receiver {

    private static final Logger log = LoggerFactory.getLogger(Receiver.class);

    @RabbitListener(queues = "hello", containerFactory = "prefetchTenRabbitListenerContainerFactory")
    public void receive(String message) {
        log.info(" [x] Received '{}'.", message);
    }

    @RabbitListener(queues = "hello")
    public void receiveWithoutPrefetch(String message) {
        log.info(" [x] Received without prefetch '{}'.", message);
    }

}

此处的两个监听器仅用于演示目的。
使用此配置 Spring 创建两个 AMQP 通道。每个@RabbitListener一个。首先使用我们的新 prefetchTenRabbitListenerContainerFactory bean 进行预取计数 10,然后使用默认 rabbitListenerContainerFactory bean 进行预取计数 1。

   @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        int concurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.concurrent-consumers"));
        int maxConcurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.max-concurrent-consumers"));
        int consecutiveActiveTrigger = Integer.parseInt(PropertiesLoader.getProperty("queue.consecutive-active-trigger"));
        int prefectCount = Integer.parseInt(PropertiesLoader.getProperty("queue.prefetch_count"));

        factory.setPrefetchCount(prefectCount);
        factory.setConcurrentConsumers(concurrentConsumers);
        factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
        factory.setConsecutiveActiveTrigger(consecutiveActiveTrigger);
        return factory;
    }

我正在使用 Spring boot 2.3.3,我在 application.properties 中更改了以下内容并且它起作用了。

spring.rabbitmq.listener.direct.prefetch=1000
spring.rabbitmq.listener.simple.prefetch=1000

我不知道直接和简单的区别所以我都设置了。

如果您想为每个队列使用不同的预取,您可以使用默认 spring 设置创建侦听器工厂并仅覆盖预取。

@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setPrefetchCount(prefetch);
    return factory;
}