Spring-amqp:使用匿名排他队列时的异常

Spring-amqp: Exceptions when using anonymous, exclusive queues

我有一系列 tomcat/spring 应用程序,需要使用几个不同的扇出交换将消息从多个不同服务器的单个实例发送到多个客户端。每个服务器实例都有自己的一组交换器。交换由服务器声明,匿名队列由每个客户端生成并绑定到这些交换。我使用匿名队列是因为我事先不知道会有多少消费者,当然我不能在不破坏扇出交换的性质的情况下将多个消费者绑定到一个命名队列。在客户端,我在日志中收到大量与尝试重新声明或删除这些匿名队列有关的异常。

为了画得更清楚:假设我有 2 个服务,服务 A 和服务 B,每个服务都发布到一对扇出交换器——比如 ExchangeA1、ExchangeA2 用于服务 A,ExchangeB1、ExchangeB2 用于服务B. 现在假设我有一个客户端建立随机命名的队列,绑定到这 4 个交换中的每一个,以便它可以监视来自两个服务的消息。

消息从服务器端成功发送到交换器,没有问题,所以我将放弃服务器端配置,但如果它变得相关,我可以提供它。这是客户端 spring-config:

<rabbit:queue id="ClientQ1" />
<rabbit:queue id="ClientQ2" />

<!-- declare the exchanges so we can bind to them -->
<rabbit:fanout-exchange id="ExchangeA1.id" name="ExchangeA1">
    <rabbit:bindings>
        <rabbit:binding queue="ClientQ1"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<rabbit:fanout-exchange id="ExchangeA2.id" name="ExchangeA2">
    <rabbit:bindings>
        <rabbit:binding queue="ClientQ2"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<rabbit:connection-factory id="rabbitConnectionFactory"  <!-- this is a simple singleton bean that establishes an SSL connection to rabbitmq -->
                        connection-factory="rabbitMQConnectionFactoryBean"
                        publisher-confirms="true"
                        channel-cache-size="5" />

<rabbit:admin id="myRabbitAdmin" connection-factory="rabbitConnectionFactory"/>

<rabbit:listener-container id="rabbitListenerContainer"
                           acknowledge="auto"
                           prefetch="1000"
                           connection-factory="rabbitConnectionFactory"
                           message-converter="EventMessageConverterBean" >

    <rabbit:listener ref="Q1ListenerBean"
                     admin="myRabbitAdmin"
                     method="processEvent"
                     queue-names="#{ClientQ1.name}" />

    <rabbit:listener ref="Q2ListenerBean"
                     admin="myRabbitAdmin"
                     method="processEvent"
                     queue-names="#{ClientQ2.name}" />

</rabbit:listener-container>

现在,想象一下,在这个客户端的其他一些依赖项中,相同的配置基本上重复绑定到 ExchangeB(1,2)。

启动时,我看到大量遵循以下模式的异常:

ERROR - SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(1158) | Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:481)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1083)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4d2b49c-140f-4893-a764-4c84f945d482]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:554)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:453)
... 2 more
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:801)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:666)
at com.sun.proxy.$Proxy114.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:533)
... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4d2b49c-140f-4893-a764-4c84f945d482' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 11 more

我的调试引导我使用 SimpleMessageListenerContainer(:954) 中的 redeclareElementsIfNecessary() 方法,如果有任何缺失,它会尝试重新声明所有 queues/exhcnages/bindings。这会失败,因为 rabbitmq 不允许重新分配独占队列。 (或者我认为)

搜索类似的问题让我找到了这个:Spring AMQP v1.4.2 - Rabbit reconnection issue on network failure,这是切线相关的,但解决方案不适用于我的问题。由于匿名队列本质上是排他性的,而我必须使用匿名队列,所以我似乎进退两难。我的问题是:

  1. 我做错了什么吗w/r/t我的监听器配置?我可以通过某种方式更改我的配置以缓解此问题吗?
  2. 为什么要调用 redeclareElementsIfNecessary?在我看来,所有有问题的队列都应该在第一次声明后存在。
  3. 这些错误真的无害吗?如果是这样,我该如何拦截它们以避免所有异常出现在我的日志中?

如有任何信息,我们将不胜感激。

spring-amqp 1.4.6

rabbitmq 3.6.10

更新 1 一些相关的 spring-调试日志输出与 rabbit:admin:

相关
17:22:14,783 DEBUG RabbitAdmin:382 - Initializing declarations
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceAExchange1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceAExchange2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceBExchange1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceBExchange2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#0'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceAQueue1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceAQueue2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceBQueue1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceBQueue2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'consumer.index.amqp.consumerQueue'
17:22:14,785 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#0'
17:22:14,785 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#1'
17:22:14,786 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#2'
17:22:14,786 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#3'
17:22:14,786 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#4'
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (07475112-7be0-4c8d-b6e5-3279e81a1aff) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (e4825008-954f-4154-bd61-9b67e0ff582e) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (001bd357-6203-49a6-b573-7f1a998ff750) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (3880a65f-0104-4c99-96ac-1958ace7e2e0) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (3b1bc32a-872e-4fc3-ba2a-de200bc7b758) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,791  INFO CachingConnectionFactory:213 - Created new connection: SimpleConnection@3158bf2b [delegate=amqp://admin@172.30.12.59:5671/]
17:22:14,791 DEBUG RabbitAdmin:382 - Initializing declarations
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceAExchange1'
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceAExchange2'
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceBExchange1'
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceBExchange2'
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#0'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#1'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#2'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceAQueue1'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceAQueue2'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceBQueue1'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceBQueue2'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'consumer.index.amqp.consumerQueue'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#0'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#1'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#2'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#3'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#4'
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (07475112-7be0-4c8d-b6e5-3279e81a1aff) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (e4825008-954f-4154-bd61-9b67e0ff582e) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (001bd357-6203-49a6-b573-7f1a998ff750) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (3880a65f-0104-4c99-96ac-1958ace7e2e0) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (3b1bc32a-872e-4fc3-ba2a-de200bc7b758) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,811 DEBUG CachingConnectionFactory:395 - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,811 DEBUG CachingConnectionFactory:395 - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,821 DEBUG RabbitTemplate:1045 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,821 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceAExchange1'
17:22:14,822 DEBUG RabbitTemplate:1045 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,825 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceAExchange1'
17:22:14,827 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceAExchange2'
17:22:14,828 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceAExchange2'
17:22:14,829 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceBExchange1'
17:22:14,830 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceBExchange1'
17:22:14,832 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceBExchange2'
17:22:14,835 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceBExchange2'
17:22:14,836 DEBUG RabbitAdmin:472 - declaring Queue '07475112-7be0-4c8d-b6e5-3279e81a1aff'
17:22:14,837 DEBUG RabbitAdmin:472 - declaring Queue '07475112-7be0-4c8d-b6e5-3279e81a1aff'
17:22:14,843 DEBUG RabbitAdmin:472 - declaring Queue 'e4825008-954f-4154-bd61-9b67e0ff582e'
17:22:14,845 DEBUG RabbitAdmin:472 - declaring Queue '001bd357-6203-49a6-b573-7f1a998ff750'
17:22:14,848 DEBUG RabbitAdmin:472 - declaring Queue '3880a65f-0104-4c99-96ac-1958ace7e2e0'
17:22:14,853 DEBUG RabbitAdmin:472 - declaring Queue '3b1bc32a-872e-4fc3-ba2a-de200bc7b758'
17:22:14,856 DEBUG PublisherCallbackChannelImpl:654 - PendingConfirms cleared
17:22:14,856 DEBUG RabbitAdmin:511 - Binding destination [07475112-7be0-4c8d-b6e5-3279e81a1aff (QUEUE)] to exchange [serviceAExchange1] with routing key []
17:22:14,858 ERROR CachingConnectionFactory:292 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue '07475112-7be0-4c8d-b6e5-3279e81a1aff' in vhost '/', class-id=50, method-id=10)
17:22:14,859 DEBUG CachingConnectionFactory:673 - Detected closed channel on exception.  Re-initializing: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,860 DEBUG RabbitAdmin:511 - Binding destination [e4825008-954f-4154-bd61-9b67e0ff582e (QUEUE)] to exchange [serviceAExchange2] with routing key []
17:22:14,862 DEBUG RabbitAdmin:511 - Binding destination [001bd357-6203-49a6-b573-7f1a998ff750 (QUEUE)] to exchange [serviceBExchange1] with routing key []
17:22:14,864  WARN RabbitAdmin:494 - Failed to declare queue:Queue [name=07475112-7be0-4c8d-b6e5-3279e81a1aff, durable=false, autoDelete=true, exclusive=true, arguments=null], continuing...
java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        [...]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue '07475112-7be0-4c8d-b6e5-3279e81a1aff' in vhost '/', class-id=50, method-id=10)
[ More declaration errors follow ]

1.4.6快2岁了;您在当前版本 (1.7.4) 中看到相同的行为吗?

异常reply-code=405, reply-text=RESOURCE_LOCKED 意味着队列中有两个消费者。 Tomcat(或任何 Web 容器)上的 Spring 应用程序的一个常见错误是加载应用程序上下文两次 - 一次在根应用程序上下文中,另一次在调度程序 servlet 的上下文中。

org.springframework 打开 DEBUG 日志记录并检查所有 bean 声明应该会有所帮助。

我刚刚将您的配置加载到一个简单的应用程序(主)中,我没有发现任何问题,即使在通过管理员强制关闭连接后也是如此 UI...

Auto-declaring a non-durable, auto-delete, or exclusive Queue (47c551b8-c290-4ff0-ae42-11f24d576399) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
Auto-declaring a non-durable, auto-delete, or exclusive Queue (c494990e-9069-4561-8d64-1eb0373cf681) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
Started So46496960Application in 1.454 seconds (JVM running for 1.879)
#method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
#method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
SimpleConnection@5fa6541 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 58388], acknowledgeMode=AUTO local queue size=0
SimpleConnection@5fa6541 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 58388], acknowledgeMode=AUTO local queue size=0
rabbitConnectionFactory#4015e40b:1/SimpleConnection@712becf3 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 58404]
Auto-declaring a non-durable, auto-delete, or exclusive Queue (47c551b8-c290-4ff0-ae42-11f24d576399) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
Auto-declaring a non-durable, auto-delete, or exclusive Queue (c494990e-9069-4561-8d64-1eb0373cf681) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.