如何使用 amqp StatefulRetryOperationsInterceptor

how to use amqp StatefulRetryOperationsInterceptor

我尝试像这样在我的代码中使用 StatefulRetryOperationsInterceptor:

@Bean
public StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor(){
   return RetryInterceptorBuilder.stateful()
          .maxAttempts(5)
          .backOffOptions(1000,2.0,10000)
          .build();
}

 @Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    logger.info("==> custom rabbitmq Listener factory:"+ connectionFactory);
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    factory.setPrefetchCount(200);
    factory.setAdviceChan(new Advice[]{
       statefulRetryOperationsInterceptor()
    })  //add retry
    return factory;
}

我的代码 运行 很好,但是当消费者出现异常时,根本不会重试。 那么如何使用StatefulRetryOperationsInterceptor呢? class 是用来捕获异常并重新发送的吗?

如果发生Exception我想重新排队消息并重新发送给消费者5次,然后将消息发送到死队列,如何使用amqp更优雅?


根据@Gary Russell的回答,我是用redis来记录异常的,有没有像StatefulRetryOperationsInterceptor这样更优雅的方法?

try {
     receiveMessage(message);
    channel.basicAck(deliveryTag, false);           
    redisTemplate.opsForHash().delete(MQConstants.MQ_CONSUMER_RETRY_COUNT_KEY, messageProperties.getMessageId());
        } 
 catch (Exception e) {     
            if (consumerCount >= MQConstants.MAX_CONSUMER_COUNT) {         
                channel.basicReject(deliveryTag, false);
            } else {
               redisTemplate.opsForHash().increment(MQConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
                        messageProperties.getMessageId(), 1);
                Thread.sleep((long) (Math.pow(MQConstants.BASE_NUM, consumerCount)*1000));
                channel.basicNack(deliveryTag, false, true);
            }
        }

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

由于您使用的是手动确认 - 您只能靠自己;容器帮不了你。您需要使用带状态重试的 AUTO 或使用 channel.basicReject() 重新排队。