spring amqp 自定义 TTL 和重试次数

spring amqp custom TTL and retry count

我们正在尝试实现客户端异常的重试机制。我们希望能够根据每条消息中的内容设置不同的路由键、ttl和重试次数。我们希望使处理程序保持简单,即;用于 handleMessage 抛出异常。我们如何处理这个异常并使用适当的参数将消息发送到 DLX。如果失败再次发生,则重试 - 消息将被丢弃(确认),或者将通过增加重试计数而放回 DLX。我们将在哪里实现这个逻辑以及如何连接?

========================

在加里的指导下,我得以实施。以下是摘录..

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    jsonMessageHandler.setQueueName(queueName);
    container.setQueueNames(queueName);
    container.setMessageListener(jsonMessageListenerAdapter());
    container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});

    return container;
}

@Bean
public MessageListenerAdapter messageListenerAdapter() {
    return new MessageListenerAdapter(messageHandler,messageConverter);
}

@Bean
public MessageListenerAdapter jsonMessageListenerAdapter() {
    return new MessageListenerAdapter(jsonMessageHandler);
}


@Bean
RetryOperationsInterceptor retryOperationsInterceptor() {
    return RetryInterceptorBuilder.stateless().recoverer(republishMessageRecoverer).maxAttempts(1).build();
}

@Bean
RepublishMessageRecoverer republishMessageRecoverer() {
    return new MyRepublishMessageRecoverer(rabbitTemplate());
}

==========

public class MyRepublishMessageRecoverer extends RepublishMessageRecoverer {

// - constructor

@Override
public void recover(Message message, Throwable cause) {
    //Deal with headers

    long currentCount = 0;

    List xDeathList = (List)message.getMessageProperties().getHeaders().get("x-death");
    if(xDeathList != null && xDeathList.size() > 0) {
        currentCount =  (Long)((Map)(xDeathList.get(0))).get("count");
    }

    if(currentCount < context.getRules().getNumberOfRetries()) {
        //message sent to DLX
        this.retryTemplate.send(handlerProperties.getSystem(), message);
    } else {
       //message ignored    
    }

    throw new AmqpRejectAndDontRequeueException(cause);
}

}

您无法修改被拒绝的消息,它会原封不动地路由到 DLX/DLQ(除了 x-death headers 是由代理添加的)。

如果您想更改邮件属性,您必须自己重新发布到 DLX/DLQ。

您可以使用 Spring Retry with a customized RepublishMessageRecoverer 来执行此操作。