spring amqp rabbit 最大消费者连接重试次数

spring amqp rabbit max consumer connection retries

我正在尝试确定从我的应用程序到 Rabbit 代理的最大重试次数。 我有重试拦截器,

@Bean
public RetryOperationsInterceptor retryOperationsInterceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(CommonConstants.MAX_AMQP_RETRIES)
            .backOffOptions(500, 2.0, 3000)
            .build();
}

这在侦听器容器中使用,

container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});

但是,在重试几次之后,消费者会在无限循环中重新尝试连接,

2017-02-21 15:03:12.229  WARN 9292 --- [nsumerThread_92]  o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing  can restart if the connection factory supports it. Exception summary:  org.springframework.amqp.AmqpConnectException: java.net.ConnectException:    Connection refused: connect
2017-02-21 15:03:12.229  INFO 9292 --- [nsumerThread_92] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
2017-02-21 15:03:13.245  WARN 9292 --- [nsumerThread_93] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2017-02-21 15:03:13.245  INFO 9292 --- [nsumerThread_93] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
2017-02-21 15:03:13.261 ERROR 9292 --- [nsumerThread_83] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).

我希望应用程序因在 MAX_RETRY # 限制后无法连接到代理而失败并出错。

感谢帮助

EDIT

根据@artem-bilan 的建议,我最终使用了 Component

public class BrokerFailureEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent>

在这个class中onApplicationEvent我统计了失败的次数然后采取适当的行动。

如果是生产者端,则情况略有不同。正如@artem-bilan 所解释的那样,应用程序需要处理任何问题。我探索了使用 netflix-hystrix 并为生产方法添加了 fallback 方法,并将采用该路线。再次感谢。

嗯,你误会了一点container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});。针对消息处理过程中的业务错误:

Business exception handling, as opposed to protocol errors and dropped connections, might need more thought and some custom configuration, especially if transactions and/or container acks are in use. Prior to 2.8.x, RabbitMQ had no definition of dead letter behaviour, so by default a message that is rejected or rolled back because of a business exception can be redelivered ad infinitum. To put a limit in the client on the number of re-deliveries, one choice is a StatefulRetryOperationsInterceptor in the advice chain of the listener. The interceptor can have a recovery callback that implements a custom dead letter action: whatever is appropriate for your particular environment.

与以下内容相矛盾:

In fact it loops endlessly trying to restart the consumer, and only if the consumer is very badly behaved indeed will it give up. One side effect is that if the broker is down when the container starts, it will just keep trying until a connection can be established.

你需要的是 ListenerContainerConsumerFailedEvent,它被发出为:

private void logConsumerException(Throwable t) {
        if (logger.isDebugEnabled()
                || !(t instanceof AmqpConnectException  || t instanceof ConsumerCancelledException)) {
            logger.warn(
                    "Consumer raised exception, processing can restart if the connection factory supports it",
                    t);
        }
        else {
            logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. "
                    + "Exception summary: " + t);
        }
        publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t);
    }

因此,您可以监听这些事件并在达到某些条件时停止您的应用程序。