Spring 由于队列检查重试限制导致 rabbitmq 集群出现 AMQP 重新连接问题

Spring AMQP reconnection issue with rabbitmq cluster due to queue checking retry limit

我有一个包含 3 个节点的 rabbitmq 集群。一个节点有一个名为 test-queue.

的持久且非镜像的经典队列

我有一个 spring 启动应用程序使用 spring-AMQP 默认连接工厂 new CachingConnectionFactory() 首先确保队列存在然后订阅它的消息。一切正常

然后我开始对rabbitmq集群进行滚动更新,其中的节点正在一个接一个地重启。

我在这个过程中从日志中观察到以下内容:

开始时我看到以下输出

Received shutdown signal for consumer tag=amq.ctag-pzPHM_GEd5e-J5Y_L2W7_g com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
...
org.springframework.amqp.rabbit.connection.CachingConnectionFactory[m][] - Attempting to connect to: xxx:5672
...
org.springframework.amqp.rabbit.connection.CachingConnectionFactory[m][] - Created new connection: xxx#66971f6b:58/SimpleConnection@4315e774 

表示应用收到关机信号并成功重连。此时,看起来有队列的节点已关闭,但应用程序能够建立新连接,因为还有其他节点

后来我看到更多的关闭信号,表明另一个节点开始关闭

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer[m][] - Consumer raised exception, processing can restart if the connection factory supports it com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced 

同时我注意到下面的日志,这表明虽然已连接,spring amqp 找不到队列。我猜这是因为节点的队列已关闭。 Spring amqp 可能正在检查其他节点。它认为队列不存在,所以它开始重新创建队列。另请注意,重试限制为 3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer[m][] - Failed to declare queue: test-queuey
Queue declaration failed; retries left=3 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[test-queue]
...
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - queue 'test-queue' in vhost '/' process is stopped by supervisor, class-id=50, method-id=10)

最后,重试耗尽。我注意到以下几点。貌似spring amqp 放弃了,开始关闭一切。最终状态是,没有消费者注册到队列中。 Spring 应用程序仍然 运行 但无法接收消息。它不再像处理断开连接那样重试。解决方案是重新启动应用程序。

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer[m][] - Cancelling Consumer@7f74d6dd: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@xxx:5672/,26), conn: Proxy@65ef722a Shared Rabbit Connection: SimpleConnection@4315e774 [delegate=amqp://guest@xxx:5672/, localPort= 37208], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer[m][] - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@xxx:5672/,26), conn: Proxy@65ef722a Shared Rabbit Connection: SimpleConnection@4315e774 [delegate=amqp://guest@xxx:5672/, localPort= 37208]
org.springframework.amqp.rabbit.connection.CachingConnectionFactory[m][] - Closing cached Channel: AMQChannel
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer[m][] - Stopping container from aborted consumer
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer[m][] - Shutting down Rabbit listener container

我得到 spring amqp 带有重试断开连接逻辑,它会无限期地重新连接。但是对于这种情况,我怎样才能让 spring 等到集群重启完成然后开始重新连接?或者有没有办法禁用队列检查的重试限制,以便它会一直检查队列直到集群重启完成而不是提前放弃?将队列更改为镜像队列或仲裁队列可以解决此问题吗?

https://docs.spring.io/spring-amqp/docs/current/reference/html/#declarationRetries

The number of retry attempts when passive queue declaration fails. Passive queue declaration occurs when the consumer starts or, when consuming from multiple queues, when not all queues were available during initialization. When none of the configured queues can be passively declared (for any reason) after the retries are exhausted, the container behavior is controlled by the 'missingQueuesFatal` property, described earlier.

https://docs.spring.io/spring-amqp/docs/current/reference/html/#failedDeclarationRetryInterval

The interval between passive queue declaration retry attempts. Passive queue declaration occurs when the consumer starts or, when consuming from multiple queues, when not all queues were available during initialization.

您可以增加其中一项或两项的默认值(分别为 3 和 5000)。