是否可以在@RabbitListener 上设置预取计数
Is it possible to set prefetch count on @RabbitListener
我知道可以制作 SimpleMessageListenerContainer
bean 并在此处设置预取计数和消息侦听器,如下所示:
@Bean
public SimpleMessageListenerContainer messageListenerContainer(
ConnectionFactory rabbitConnectionFactory,
Receiver receiver) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("hello");
container.setMessageListener(new MessageListenerAdapter(receiver, "receive"));
container.setPrefetchCount(1000);
return container;
}
但是如果我想使用 @RabbitListener
的声明式方法,如何设置通道的预取计数?
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(queues = "hello") // how to set prefetch count here?
public void receive(String message) {
log.info(" [x] Received '{}'.", message);
}
}
不可能?
@RabbitListener
有 containerFactory
选项:
/**
* The bean name of the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* bean name.
*/
String containerFactory() default "";
您可以在其中使用所需的 prefetchCount
配置 SimpleRabbitListenerContainerFactory
,该注释的目标 SimpleMessageListenerContainer
将为您提供该选项。
根据@artem-bilan 回答的解决方案:
在某些 @Configuration
class:
中声明 RabbitListenerContainerFactory
预取计数为 10 的 bean
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory);
factory.setPrefetchCount(10);
return factory;
}
Receiver
bean 使用这个工厂 bean:
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(queues = "hello", containerFactory = "prefetchTenRabbitListenerContainerFactory")
public void receive(String message) {
log.info(" [x] Received '{}'.", message);
}
@RabbitListener(queues = "hello")
public void receiveWithoutPrefetch(String message) {
log.info(" [x] Received without prefetch '{}'.", message);
}
}
此处的两个监听器仅用于演示目的。
使用此配置 Spring 创建两个 AMQP 通道。每个@RabbitListener
一个。首先使用我们的新 prefetchTenRabbitListenerContainerFactory
bean 进行预取计数 10,然后使用默认 rabbitListenerContainerFactory
bean 进行预取计数 1。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
int concurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.concurrent-consumers"));
int maxConcurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.max-concurrent-consumers"));
int consecutiveActiveTrigger = Integer.parseInt(PropertiesLoader.getProperty("queue.consecutive-active-trigger"));
int prefectCount = Integer.parseInt(PropertiesLoader.getProperty("queue.prefetch_count"));
factory.setPrefetchCount(prefectCount);
factory.setConcurrentConsumers(concurrentConsumers);
factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
factory.setConsecutiveActiveTrigger(consecutiveActiveTrigger);
return factory;
}
我正在使用 Spring boot 2.3.3,我在 application.properties 中更改了以下内容并且它起作用了。
spring.rabbitmq.listener.direct.prefetch=1000
spring.rabbitmq.listener.simple.prefetch=1000
我不知道直接和简单的区别所以我都设置了。
如果您想为每个队列使用不同的预取,您可以使用默认 spring 设置创建侦听器工厂并仅覆盖预取。
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPrefetchCount(prefetch);
return factory;
}
我知道可以制作 SimpleMessageListenerContainer
bean 并在此处设置预取计数和消息侦听器,如下所示:
@Bean
public SimpleMessageListenerContainer messageListenerContainer(
ConnectionFactory rabbitConnectionFactory,
Receiver receiver) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("hello");
container.setMessageListener(new MessageListenerAdapter(receiver, "receive"));
container.setPrefetchCount(1000);
return container;
}
但是如果我想使用 @RabbitListener
的声明式方法,如何设置通道的预取计数?
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(queues = "hello") // how to set prefetch count here?
public void receive(String message) {
log.info(" [x] Received '{}'.", message);
}
}
不可能?
@RabbitListener
有 containerFactory
选项:
/**
* The bean name of the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* bean name.
*/
String containerFactory() default "";
您可以在其中使用所需的 prefetchCount
配置 SimpleRabbitListenerContainerFactory
,该注释的目标 SimpleMessageListenerContainer
将为您提供该选项。
根据@artem-bilan 回答的解决方案:
在某些 @Configuration
class:
RabbitListenerContainerFactory
预取计数为 10 的 bean
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory);
factory.setPrefetchCount(10);
return factory;
}
Receiver
bean 使用这个工厂 bean:
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(queues = "hello", containerFactory = "prefetchTenRabbitListenerContainerFactory")
public void receive(String message) {
log.info(" [x] Received '{}'.", message);
}
@RabbitListener(queues = "hello")
public void receiveWithoutPrefetch(String message) {
log.info(" [x] Received without prefetch '{}'.", message);
}
}
此处的两个监听器仅用于演示目的。
使用此配置 Spring 创建两个 AMQP 通道。每个@RabbitListener
一个。首先使用我们的新 prefetchTenRabbitListenerContainerFactory
bean 进行预取计数 10,然后使用默认 rabbitListenerContainerFactory
bean 进行预取计数 1。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
int concurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.concurrent-consumers"));
int maxConcurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.max-concurrent-consumers"));
int consecutiveActiveTrigger = Integer.parseInt(PropertiesLoader.getProperty("queue.consecutive-active-trigger"));
int prefectCount = Integer.parseInt(PropertiesLoader.getProperty("queue.prefetch_count"));
factory.setPrefetchCount(prefectCount);
factory.setConcurrentConsumers(concurrentConsumers);
factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
factory.setConsecutiveActiveTrigger(consecutiveActiveTrigger);
return factory;
}
我正在使用 Spring boot 2.3.3,我在 application.properties 中更改了以下内容并且它起作用了。
spring.rabbitmq.listener.direct.prefetch=1000
spring.rabbitmq.listener.simple.prefetch=1000
我不知道直接和简单的区别所以我都设置了。
如果您想为每个队列使用不同的预取,您可以使用默认 spring 设置创建侦听器工厂并仅覆盖预取。
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPrefetchCount(prefetch);
return factory;
}