如何使用@RabbitListener 优雅地停止消费消息
How to gracefully stop consuming messages with @RabbitListener
有没有办法优雅地停止 ListenerContainer
及其关联的 Consumers
。
我正在努力实现的目标。
- 停止消费消息。
- 优雅地停下来
ListenerContainer
.
- 等待很长时间 运行 消费者,完成后确认。
我可以使用 consumer.stop()
停止 ListenerContainers
,但是活跃的 long 运行 消费者不会成功完成,处理的消息不会被确认,因此一旦 ListenerContainer 恢复,将再次处理。
输出
Waiting for workers to finish.
Workers not finished.
Closing channel for unresponsive consumer: Consumer@6d229b1c
消息已处理,但未确认。
我也许可以使用 setForceCloseChannel(false)
实现正常关闭,但是是否可以验证取消的消费者是否已完成? SimpleMessageListenerContainer.doShutDown() 有一个局部作用域列表 "canceledConsumers".
增加关机超时时间。
参见Message Listener Container Configuration。
shutdownTimeout
When a container shuts down (for example, if its enclosing ApplicationContext is closed), it waits for in-flight messages to be processed up to this limit. Defaults to five seconds.
/**
* The time to wait for workers in milliseconds after the container is stopped. If any
* workers are active when the shutdown signal comes they will be allowed to finish
* processing as long as they can finish within this timeout. Defaults
* to 5 seconds.
* @param shutdownTimeout the shutdown timeout to set
*/
public void setShutdownTimeout(long shutdownTimeout) {
要完成@user634545 的最后评论,我们必须处理 2 个属性:
必须调整这些参数以验证以下内容:
prefetchCount < (shutdownTimeout / consumerExecutionTimePerMessage)
这意味着在收到关闭命令后,消费者应该能够消费并知道每条预取消息。
有没有办法优雅地停止 ListenerContainer
及其关联的 Consumers
。
我正在努力实现的目标。
- 停止消费消息。
- 优雅地停下来
ListenerContainer
. - 等待很长时间 运行 消费者,完成后确认。
我可以使用 consumer.stop()
停止 ListenerContainers
,但是活跃的 long 运行 消费者不会成功完成,处理的消息不会被确认,因此一旦 ListenerContainer 恢复,将再次处理。
输出
Waiting for workers to finish.
Workers not finished.
Closing channel for unresponsive consumer: Consumer@6d229b1c
消息已处理,但未确认。
我也许可以使用 setForceCloseChannel(false)
实现正常关闭,但是是否可以验证取消的消费者是否已完成? SimpleMessageListenerContainer.doShutDown() 有一个局部作用域列表 "canceledConsumers".
增加关机超时时间。
参见Message Listener Container Configuration。
shutdownTimeout
When a container shuts down (for example, if its enclosing ApplicationContext is closed), it waits for in-flight messages to be processed up to this limit. Defaults to five seconds.
/**
* The time to wait for workers in milliseconds after the container is stopped. If any
* workers are active when the shutdown signal comes they will be allowed to finish
* processing as long as they can finish within this timeout. Defaults
* to 5 seconds.
* @param shutdownTimeout the shutdown timeout to set
*/
public void setShutdownTimeout(long shutdownTimeout) {
要完成@user634545 的最后评论,我们必须处理 2 个属性:
必须调整这些参数以验证以下内容:
prefetchCount < (shutdownTimeout / consumerExecutionTimePerMessage)
这意味着在收到关闭命令后,消费者应该能够消费并知道每条预取消息。