Spring AMQP 在队列 Bean 上设置 prefetchCount

Spring AMQP Set prefetchCount on Queue Bean

如何为这个队列消费者设置 prefetchCount

@Bean
public Queue eventQueue(AmqpAdmin amqpAdmin) {
    Queue queue = QueueBuilder.durable(EVENT_QUEUE_NAME)
            ...
            .build();

    TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE, true, false);

    amqpAdmin.declareBinding(BindingBuilder
            .bind(queue)
            .to(topicExchange)
            .with(EVENT_ROUTING_KEY));

    return queue;
}

documentation 注意到 prefetchCount 这是一个容器配置,但是在我的工厂 bean 上设置它并没有起作用,默认值是 250:

    @Bean
    public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPrefetchCount(10); // doesn't work; defaults to 250
        return factory;
    }

更新

根据下面@GaryRussell 的评论,我确实测试了默认 rabbitListenerContainerFactory 并且还验证了我的 Spring 引导配置 spring.rabbitmq.listener.simple.prefetchAbstractRabbitListenerContainerFactoryConfigurer 中被消耗了。但是,当我查看 RabbitMQ 中的队列消费者时,我可以看到我使用默认容器设置定义的队列仍然具有 250:

prefetchCount

我使用 RabbitMQ 管理面板作为真实来源。我不认为它在说谎,因为我有一堆用自定义容器实例化的动态队列,并且它们确实具有非默认(正确)prefetchCounts。我还在 Spring 容器启动中验证了只有一个(预期的)rabbitListenerContainerFactory bean。

预取不是队列属性,它是消费者属性。

你的听众长什么样?

您使用的是容器工厂的非标准名称。

您需要将 containerFactory 属性 添加到 @RabbitListener 或者您需要将 bean 重命名为 rabbitListenerContainerFactory(覆盖 Boot 定义的工厂 @Bean ).

还有

amqpAdmin.declareBinding(BindingBuilder

您不应该在 bean 定义中与 broker 交谈 - 现在还为时过早。

只需将您的队列、交换器和绑定添加为 @Beans 和 您也可以只在 application.properties/yaml 文件中设置预取(如果您使用的是 Spring Boot)。管理员会在第一次打开连接时找到它们并声明它们。

编辑

还有其他事情发生...

@SpringBootApplication
public class So62049769Application {

    public static void main(String[] args) {
        SpringApplication.run(So62049769Application.class, args);
    }

    @Bean
    public Queue queue() {
        return new Queue("so62049769");
    }

    @RabbitListener(queues = "so62049769")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.rabbitmq.listener.simple.prefetch=42