如何防止 Spring AMQP 阻塞未确认的消息?
How to Prevent Spring AMQP from Blocking on Unacked Messages?
我有一个@RabbitListener
annotated method for which Spring AMQP blocks after returning from the method. The underlying SimpleRabbitListenerContainerFactory
uses AcknowledgeMode.MANUAL
。我还不想在侦听器方法中确认消息。
在这种情况下,有没有办法不让 Spring AMQP 阻塞?
更详细
我使用这样的监听器:
@RabbitListener(queues = "#{ @myQueue }")
void recordRequestsFromMyMessages(
@Payload MyMessage myMessagePayload,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
Channel channel) {
// record relevant parts of the given message and combine them with
// parts from previous/future messages
// DON'T acknowledge the consumed message, yet; instead only keep a
// record of the channel and the delivery tag
}
因为我 batch/combine 在我实际处理它们之前(异步)稍后我有多个消息,我不想立即确认消费的消息。相反,我只想在稍后成功处理消息后执行此操作。
使用我当前的方法,Spring AMQP 在调用上面的 recordRequestsFromMyMessages
方法返回后阻塞,并且不再使用同一队列中的更多消息。
This SO answer 建议批处理应该工作,但是,我不确定如何。
"blocking" 不是容器。
您需要增加容器上的 prefetchCount
(默认为 1)- 代理只允许该数量的未确认消息未完成。
我有一个@RabbitListener
annotated method for which Spring AMQP blocks after returning from the method. The underlying SimpleRabbitListenerContainerFactory
uses AcknowledgeMode.MANUAL
。我还不想在侦听器方法中确认消息。
在这种情况下,有没有办法不让 Spring AMQP 阻塞?
更详细
我使用这样的监听器:
@RabbitListener(queues = "#{ @myQueue }")
void recordRequestsFromMyMessages(
@Payload MyMessage myMessagePayload,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
Channel channel) {
// record relevant parts of the given message and combine them with
// parts from previous/future messages
// DON'T acknowledge the consumed message, yet; instead only keep a
// record of the channel and the delivery tag
}
因为我 batch/combine 在我实际处理它们之前(异步)稍后我有多个消息,我不想立即确认消费的消息。相反,我只想在稍后成功处理消息后执行此操作。
使用我当前的方法,Spring AMQP 在调用上面的 recordRequestsFromMyMessages
方法返回后阻塞,并且不再使用同一队列中的更多消息。
This SO answer 建议批处理应该工作,但是,我不确定如何。
"blocking" 不是容器。
您需要增加容器上的 prefetchCount
(默认为 1)- 代理只允许该数量的未确认消息未完成。