Spring AMQP 在队列 Bean 上设置 prefetchCount
Spring AMQP Set prefetchCount on Queue Bean
如何为这个队列消费者设置 prefetchCount
?
@Bean
public Queue eventQueue(AmqpAdmin amqpAdmin) {
Queue queue = QueueBuilder.durable(EVENT_QUEUE_NAME)
...
.build();
TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE, true, false);
amqpAdmin.declareBinding(BindingBuilder
.bind(queue)
.to(topicExchange)
.with(EVENT_ROUTING_KEY));
return queue;
}
documentation 注意到 prefetchCount
这是一个容器配置,但是在我的工厂 bean 上设置它并没有起作用,默认值是 250
:
@Bean
public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(10); // doesn't work; defaults to 250
return factory;
}
更新
根据下面@GaryRussell 的评论,我确实测试了默认 rabbitListenerContainerFactory
并且还验证了我的 Spring 引导配置 spring.rabbitmq.listener.simple.prefetch
在 AbstractRabbitListenerContainerFactoryConfigurer
中被消耗了。但是,当我查看 RabbitMQ 中的队列消费者时,我可以看到我使用默认容器设置定义的队列仍然具有 250:
的 prefetchCount
我使用 RabbitMQ 管理面板作为真实来源。我不认为它在说谎,因为我有一堆用自定义容器实例化的动态队列,并且它们确实具有非默认(正确)prefetchCount
s。我还在 Spring 容器启动中验证了只有一个(预期的)rabbitListenerContainerFactory
bean。
预取不是队列属性,它是消费者属性。
你的听众长什么样?
您使用的是容器工厂的非标准名称。
您需要将 containerFactory
属性 添加到 @RabbitListener
或者您需要将 bean 重命名为 rabbitListenerContainerFactory
(覆盖 Boot 定义的工厂 @Bean
).
还有
amqpAdmin.declareBinding(BindingBuilder
您不应该在 bean 定义中与 broker 交谈 - 现在还为时过早。
只需将您的队列、交换器和绑定添加为 @Bean
s 和
您也可以只在 application.properties/yaml 文件中设置预取(如果您使用的是 Spring Boot)。管理员会在第一次打开连接时找到它们并声明它们。
编辑
还有其他事情发生...
@SpringBootApplication
public class So62049769Application {
public static void main(String[] args) {
SpringApplication.run(So62049769Application.class, args);
}
@Bean
public Queue queue() {
return new Queue("so62049769");
}
@RabbitListener(queues = "so62049769")
public void listen(String in) {
System.out.println(in);
}
}
spring.rabbitmq.listener.simple.prefetch=42
如何为这个队列消费者设置 prefetchCount
?
@Bean
public Queue eventQueue(AmqpAdmin amqpAdmin) {
Queue queue = QueueBuilder.durable(EVENT_QUEUE_NAME)
...
.build();
TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE, true, false);
amqpAdmin.declareBinding(BindingBuilder
.bind(queue)
.to(topicExchange)
.with(EVENT_ROUTING_KEY));
return queue;
}
documentation 注意到 prefetchCount
这是一个容器配置,但是在我的工厂 bean 上设置它并没有起作用,默认值是 250
:
@Bean
public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(10); // doesn't work; defaults to 250
return factory;
}
更新
根据下面@GaryRussell 的评论,我确实测试了默认 rabbitListenerContainerFactory
并且还验证了我的 Spring 引导配置 spring.rabbitmq.listener.simple.prefetch
在 AbstractRabbitListenerContainerFactoryConfigurer
中被消耗了。但是,当我查看 RabbitMQ 中的队列消费者时,我可以看到我使用默认容器设置定义的队列仍然具有 250:
prefetchCount
我使用 RabbitMQ 管理面板作为真实来源。我不认为它在说谎,因为我有一堆用自定义容器实例化的动态队列,并且它们确实具有非默认(正确)prefetchCount
s。我还在 Spring 容器启动中验证了只有一个(预期的)rabbitListenerContainerFactory
bean。
预取不是队列属性,它是消费者属性。
你的听众长什么样?
您使用的是容器工厂的非标准名称。
您需要将 containerFactory
属性 添加到 @RabbitListener
或者您需要将 bean 重命名为 rabbitListenerContainerFactory
(覆盖 Boot 定义的工厂 @Bean
).
还有
amqpAdmin.declareBinding(BindingBuilder
您不应该在 bean 定义中与 broker 交谈 - 现在还为时过早。
只需将您的队列、交换器和绑定添加为 @Bean
s 和
您也可以只在 application.properties/yaml 文件中设置预取(如果您使用的是 Spring Boot)。管理员会在第一次打开连接时找到它们并声明它们。
编辑
还有其他事情发生...
@SpringBootApplication
public class So62049769Application {
public static void main(String[] args) {
SpringApplication.run(So62049769Application.class, args);
}
@Bean
public Queue queue() {
return new Queue("so62049769");
}
@RabbitListener(queues = "so62049769")
public void listen(String in) {
System.out.println(in);
}
}
spring.rabbitmq.listener.simple.prefetch=42