Spring PCF 上 RabbitMQ 的 AMQP 陈旧消费者

Spring AMQP Stale Consumers for RabbitMQ on PCF

我们的团队在使用 cloud foundry 连接器的 Spring AMQP 方面遇到了一些问题。每当网络不稳定,连接失败,然后 AMQP 似乎尝试自动恢复,自动恢复失败,然后创建一个新的消费者。

问题是 RabbitMQ 似乎没有注销旧消费者...所以我们最终有 40 个消费者,而不是通常的 一个 个消费者。而且,我认为,由于无效消费者的数量,旧消费者会收到一些消息,但不会 运行 我们的代码。所以我们有一堆未确认的消息,直到应用程序重新启动并清除消费者列表。

我不确定这是我们的配置错误还是错误。有什么想法吗?

下面的附加信息。

我们日志的一小部分:

a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-Dqik8vz9dSPiUzMbM4uGbw: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.823 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler     : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-t7_vY47weZNYtWagl__pYA: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.823 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler     : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-AK1YOpN0_E2KVwSi4fzOaw: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:693) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:687) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.822 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler     : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-vrWhRdhk3ejrXo2-ImjXzA: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:244) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     ... 7 common frames omitted
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access[=11=]0(AutorecoveringConnection.java:53) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:687) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:693) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:673) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]

我们使用@RabbitListener 注释注册我们的消费者

    @RabbitListener(id = CONSUMER_ID, queues = "${CommonInternalEndPoint}", containerFactory = "rabbitLocalContainerFactory")
    protected void process(Object messageObject) {

当我们连接到多个rabbit mq 实例时,我们需要手动声明连接工厂。我们的连接工厂来自 Cloud Foundry 连接器:

@Configuration
@Profile("cloud")
public class RabbitCloudConfiguration {

    @Bean
    public Cloud cloud(){
        return new CloudFactory().getCloud();
    }

    @Bean
    @Primary
    public ConnectionFactory connectionFactory(){
        return cloud().getSingletonServiceConnector(ConnectionFactory.class, null);
    }

    @Bean("rabbitLocalContainerFactory")
    public SimpleRabbitListenerContainerFactory rabbitLocalContainerFactory(){
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory());
        containerFactory.setAutoStartup(true);
        return containerFactory;
    }
}

We are using Spring Boot 1.5.9, Framework 4.3.6 , AMQP 1.7.4, RabbitMQ AMQP Client 4.0.3

从版本 2.0 开始,Spring AMQP 在目标 RabbitMQ ConnectionFactory 上禁用 automaticRecoveryEnabledhttps://docs.spring.io/spring-amqp/docs/2.1.4.RELEASE/reference/#auto-recovery

由于您使用的版本是基于云连接器提供的 RabbitMQ ConnectionFactory,您需要明确禁用它:

@Bean
@Primary
public ConnectionFactory connectionFactory(){
    ConnectionFactory connectionFactory = cloud().getSingletonServiceConnector(ConnectionFactory.class, null);
    ((CachingConnectionFactory) connectionFactory).getRabbitConnectionFactory().setAutomaticRecoveryEnabled(false);
    return connectionFactory;
}

您的解决方案将基于 Spring AMQP 提供的恢复机制。