重置监听器中的预取计数(rabbitmq,spring-amqp)
reset prefetch count in Listener (rabbitmq, spring-amqp)
我正在使用 spring-amqp。
如何在实现 ChannelAwareMessageListener 的侦听器中重置预取计数。
public class TestListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
if (some conditions) {
// the prefetch count has been initialized to 1 in the SimpleMessageListenerContainer
// here I want to reset the prefetch count
channel.basicQos(10, true); // not working, I want to request 10 messages next time
// I can do this way, following code work as expected, but is this the right way?
container.stop(); // SimpleMessageListenerContainer
container.setPrefetchCount(10);
container.start();
}
}
}
简而言之,我想在侦听器中动态重置预取计数。
更改通道上的预取只会影响在该通道上创建的新消费者。现有消费者获得创建时通道上的 qos 预取。
是的,停止并重新启动容器即可。
但是,您不应该在侦听器线程上执行此操作,您应该为 stop/start 使用任务执行器;否则 stop() 将延迟 5 秒(默认情况下)等待消费者线程 return 到容器(因此你不应该 运行 stop()
在监听线程上) .
或者您可以减少 shutdownTimeout
我正在使用 spring-amqp。
如何在实现 ChannelAwareMessageListener 的侦听器中重置预取计数。
public class TestListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
if (some conditions) {
// the prefetch count has been initialized to 1 in the SimpleMessageListenerContainer
// here I want to reset the prefetch count
channel.basicQos(10, true); // not working, I want to request 10 messages next time
// I can do this way, following code work as expected, but is this the right way?
container.stop(); // SimpleMessageListenerContainer
container.setPrefetchCount(10);
container.start();
}
}
}
简而言之,我想在侦听器中动态重置预取计数。
更改通道上的预取只会影响在该通道上创建的新消费者。现有消费者获得创建时通道上的 qos 预取。
是的,停止并重新启动容器即可。
但是,您不应该在侦听器线程上执行此操作,您应该为 stop/start 使用任务执行器;否则 stop() 将延迟 5 秒(默认情况下)等待消费者线程 return 到容器(因此你不应该 运行 stop()
在监听线程上) .
或者您可以减少 shutdownTimeout