重新建立与主机的连接后,侦听器不提取数据

Listener doesn't pull data after connection to host was reestablished

首先让我说我刚刚开始涉足 AMQP。

我想 consume/pull 来自队列的数据。我正在使用 Spring 的库 (spring-boot-starter-amqp) 以使事情变得更容易。我有一个侦听器 class,其方法在我设置队列的地方用 @RabbitListener 注释。其他一切都通过属性配置:

    rabbitmq:
      username: user
      password: password
      virtual-host: virtual-host
      port: 5672
      host: host
      queue: _316_
      listener:
        simple:
          retry:
            enabled: true
            initial-interval: 1000
            max-attempts: 8
            max-interval: 10000
            multiplier: 2.0
            stateless: true

一切正常,直到我让主机暂时不可用。当发生这种情况时,连接将被断开并尝试重新建立连接。再次重新建立连接后,侦听器不会开始提取消息。应用程序重新启动后一切都很好,但我确信它可以以某种方式配置为消费者不断重新启动,至少它应该在重新建立连接后尝试这样做(或者至少这是我所期望的)。

连接断开后,可以在日志中找到以下内容:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-PgBSeymWBfsghwdUYr5asA (_316_); Consumer@22ead351: tags=[[amq.ctag-PgBSeymWBfsghwdUYr5asA]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer WARN Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@22ead351: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host:5672]

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:564)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1201)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1046)\n\tat java.base/java.lang.Thread.run(Thread.java:835)\nCaused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer","message":"Consumer received fatal exception on startup

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Waiting for workers to finish.

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Successfully waited for workers to finish.

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat 

然后进行连接尝试,我们进入循环:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED 

直到重新建立连接:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Created new connection: rabbitConnectionFactory#69d3cf7e:16/SimpleConnection@3931e0ad [delegate=amqp://user@host, localPort= 50574]

没有其他事情发生 - 没有消息被消费。

更新: 遵循建议并打开 DEBUG 日志记录。

应用启动时:

  1. 正在启动侦听器容器
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Starting Rabbit listener container.
  1. 正在创建连接

  2. 开始消费

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@3daf03d8: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
  1. 正在创建频道并开始使用
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Creating cached Rabbit Channel from AMQChannel(amqp://user@host,1)

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG ConsumeOK: Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Started on queue '_316_' with tag amq.ctag-uG8_iXcNaknFjBIGM-91Tg: Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Storing delivery for consumerTag: 'amq.ctag-uG8_iXcNaknFjBIGM-91Tg' with deliveryTag: '1' in Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Received message: (Body:'[B@4b817fae(byte[117])' MessageProperties [headers={}, contentLength=0, redelivered=true, receivedExchange=, receivedRoutingKey=_316_, deliveryTag=1, consumerTag=amq.ctag-uG8_iXcNaknFjBIGM-91Tg, consumerQueue=_316_])

这一直持续到连接中断:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-uG8_iXcNaknFjBIGM-91Tg (_316_); Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp:///user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp:///user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

频道已关闭,用户以某种方式被删除,如日志所述:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

连接驱动程序出现问题,随后抛出异常:

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.support.ConsumerCancelledException: null\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:499)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:78)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048)\n\tat java.base/java.lang.Thread.run(Thread.java:835

消费者提出异常,它说处理可以 r-e-s-t-a-r-t

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Consumer raised exception, processing can restart if the connection factory supports it

重述发生:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

正在关闭频道:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Closing cached Channel: AMQChannel(amqp://user@host,1)

新消费者开始:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

我们正在尝试连接,但以 WARN 和 AUTHENTICATION 失败告终,(因为之前的日志说该用户已被删除?):

An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat

ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat

尝试启动的消费者:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Consumer received fatal exception on startup

它(消费者)被取消了:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Cancelling Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

通道已关闭,容器正在停止:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

然后容器正在关闭:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Shutting down Rabbit listener container

我们正在等待 workers 完成,它成功了,然后我们尝试再次连接,同样的 SOCKET_CLOSED 被一遍又一遍地记录。

然后主机恢复并重新建立连接。正在创建缓存的 Rabit 通道,但没有任何反应。

我假设问题是容器已关闭并且再也没有恢复生机,因此没有消费者。

什么有效

我创建了一个 class,它有一个 "listenning" 方法接受 ListenerContainerConsumerFailedEvent。 class 具有 RabbitListenerEndpointRegistry(Boot 方便地为我创建的 bean),每当调用该方法时,我都会检查 listenerContainer 是否为 运行,如果不是,那么我将启动它(该检查很可能是多余的).

@EventListener
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        var listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(MessageListener.RABBIT_LISTENER_ID);
        if (!listenerContainer.isRunning()){
            listenerContainer.start();
        }
    }

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure

FatalListenerStartupException

身份验证失败被认为是致命的,容器会立即停止;这种情况不太可能自动纠正。

删除当前正在使用的用户是一种非常不寻常的情况。

您可以使用 ApplicationListener bean 或 @EventListener 方法来侦听 ListenerContainerConsumerTerminatedEvent 并在一段时间后尝试重新启动容器。