"Channel shutdown: connection error"

"Channel shutdown: connection error"

我已经在 AWS 上设置了一个具有两个节点的 RabbitMQ 集群,并按照 here 所述启用了 HA。然后,我设置了一个 Elastic Load Balancer 映射 5672 到实例的 5672 端口,并定期检查实例的 15672 端口(HTTP 管理端口)。然后,我启动了两个侦听器(每个节点一个),每个侦听器有 4 个消费者,并将 spring.rabbitmq.host 指向负载均衡器的 DNS。但是,我会定期收到以下错误:

[E] [2015-03-14 23:13:23,890] [pool-4-thread-9         ] [CachingConnectionFactory        ] [                                    ] Channel shutdown: connection error
[E] [2015-03-14 23:13:23,891] [pool-4-thread-9         ] [CachingConnectionFactory        ] [                                    ] Channel shutdown: connection error
[E] [2015-03-14 23:13:23,891] [pool-4-thread-9         ] [CachingConnectionFactory        ] [                                    ] Channel shutdown: connection error
[E] [2015-03-14 23:13:23,891] [pool-4-thread-9         ] [CachingConnectionFactory        ] [                                    ] Channel shutdown: connection error
[W] [2015-03-14 23:13:24,758] [impleAsyncTaskExecutor-5] [SimpleMessageListenerContainer  ] [                                    ] Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar!/:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[?:1.8.0_40]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar!/:?]
    ... 1 more
[I] [2015-03-14 23:13:24,761] [impleAsyncTaskExecutor-5] [SimpleMessageListenerContainer  ] [                                    ] Restarting Consumer: tags=[[amq.ctag-H-1ed6xO3GL7qW58bkLUZA]], channel=Cached Rabbit Channel: AMQChannel(amqp://qube@10.100.43.76:5672qube,1), acknowledgeMode=AUTO local queue size=0
[W] [2015-03-14 23:13:24,762] [impleAsyncTaskExecutor-8] [SimpleMessageListenerContainer  ] [                                    ] Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar!/:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[?:1.8.0_40]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar!/:?]
    ... 1 more
[I] [2015-03-14 23:13:24,762] [impleAsyncTaskExecutor-8] [SimpleMessageListenerContainer  ] [                                    ] Restarting Consumer: tags=[[amq.ctag-nJAmieDx-kSuxl9arsXiww]], channel=Cached Rabbit Channel: AMQChannel(amqp://qube@10.100.43.76:5672qube,2), acknowledgeMode=AUTO local queue size=0
[W] [2015-03-14 23:13:24,767] [impleAsyncTaskExecutor-7] [SimpleMessageListenerContainer  ] [                                    ] Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar!/:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[?:1.8.0_40]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar!/:?]
    ... 1 more
[I] [2015-03-14 23:13:24,768] [impleAsyncTaskExecutor-7] [SimpleMessageListenerContainer  ] [                                    ] Restarting Consumer: tags=[[amq.ctag-PAQvN8P57_9oiElyqYRJWw]], channel=Cached Rabbit Channel: AMQChannel(amqp://qube@10.100.43.76:5672qube,3), acknowledgeMode=AUTO local queue size=0
[W] [2015-03-14 23:13:24,768] [impleAsyncTaskExecutor-6] [SimpleMessageListenerContainer  ] [                                    ] Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar!/:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[?:1.8.0_40]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar!/:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar!/:?]
    ... 1 more
[I] [2015-03-14 23:13:24,769] [impleAsyncTaskExecutor-6] [SimpleMessageListenerContainer  ] [                                    ] Restarting Consumer: tags=[[amq.ctag-w6apIlL78ViAnTOzx2Qejg]], channel=Cached Rabbit Channel: AMQChannel(amqp://qube@10.100.43.76:5672qube,4), acknowledgeMode=AUTO local queue size=0

如果我将每个消费者直接指向一个节点(不使用负载均衡器),则不会发生这种情况。是什么导致了这种行为?我怎样才能避免这种情况?

Ping 管理端口不会使 amqp 连接保持活动状态。

您需要将心跳配置为足够低的值以防止负载平衡器丢弃 "idle" 连接。

看起来您可能正在使用 Spring 引导(基于 属性 名称)并且它目前没有将 requestedHeartbeats 公开为 属性;您需要通过使用底层 rabbitmq 连接工厂心跳设置连接您自己的 rabbitConnectionFactory bean 来覆盖引导工厂。有关详细信息,请参阅 documentation

也就是说,使用带有 Spring AMQP 的负载均衡器没有任何好处,因为它使用单个长期连接。请考虑在 addresses 属性 中配置您的两个实例,如果第一个实例不可用,它将故障转移到第二个实例。