spring amqp 自定义 TTL 和重试次数
spring amqp custom TTL and retry count
我们正在尝试实现客户端异常的重试机制。我们希望能够根据每条消息中的内容设置不同的路由键、ttl和重试次数。我们希望使处理程序保持简单,即;用于 handleMessage 抛出异常。我们如何处理这个异常并使用适当的参数将消息发送到 DLX。如果失败再次发生,则重试 - 消息将被丢弃(确认),或者将通过增加重试计数而放回 DLX。我们将在哪里实现这个逻辑以及如何连接?
========================
在加里的指导下,我得以实施。以下是摘录..
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
jsonMessageHandler.setQueueName(queueName);
container.setQueueNames(queueName);
container.setMessageListener(jsonMessageListenerAdapter());
container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(messageHandler,messageConverter);
}
@Bean
public MessageListenerAdapter jsonMessageListenerAdapter() {
return new MessageListenerAdapter(jsonMessageHandler);
}
@Bean
RetryOperationsInterceptor retryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless().recoverer(republishMessageRecoverer).maxAttempts(1).build();
}
@Bean
RepublishMessageRecoverer republishMessageRecoverer() {
return new MyRepublishMessageRecoverer(rabbitTemplate());
}
==========
public class MyRepublishMessageRecoverer extends RepublishMessageRecoverer {
// - constructor
@Override
public void recover(Message message, Throwable cause) {
//Deal with headers
long currentCount = 0;
List xDeathList = (List)message.getMessageProperties().getHeaders().get("x-death");
if(xDeathList != null && xDeathList.size() > 0) {
currentCount = (Long)((Map)(xDeathList.get(0))).get("count");
}
if(currentCount < context.getRules().getNumberOfRetries()) {
//message sent to DLX
this.retryTemplate.send(handlerProperties.getSystem(), message);
} else {
//message ignored
}
throw new AmqpRejectAndDontRequeueException(cause);
}
}
您无法修改被拒绝的消息,它会原封不动地路由到 DLX/DLQ(除了 x-death
headers 是由代理添加的)。
如果您想更改邮件属性,您必须自己重新发布到 DLX/DLQ。
您可以使用 Spring Retry with a customized RepublishMessageRecoverer 来执行此操作。
我们正在尝试实现客户端异常的重试机制。我们希望能够根据每条消息中的内容设置不同的路由键、ttl和重试次数。我们希望使处理程序保持简单,即;用于 handleMessage 抛出异常。我们如何处理这个异常并使用适当的参数将消息发送到 DLX。如果失败再次发生,则重试 - 消息将被丢弃(确认),或者将通过增加重试计数而放回 DLX。我们将在哪里实现这个逻辑以及如何连接?
========================
在加里的指导下,我得以实施。以下是摘录..
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
jsonMessageHandler.setQueueName(queueName);
container.setQueueNames(queueName);
container.setMessageListener(jsonMessageListenerAdapter());
container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(messageHandler,messageConverter);
}
@Bean
public MessageListenerAdapter jsonMessageListenerAdapter() {
return new MessageListenerAdapter(jsonMessageHandler);
}
@Bean
RetryOperationsInterceptor retryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless().recoverer(republishMessageRecoverer).maxAttempts(1).build();
}
@Bean
RepublishMessageRecoverer republishMessageRecoverer() {
return new MyRepublishMessageRecoverer(rabbitTemplate());
}
==========
public class MyRepublishMessageRecoverer extends RepublishMessageRecoverer {
// - constructor
@Override
public void recover(Message message, Throwable cause) {
//Deal with headers
long currentCount = 0;
List xDeathList = (List)message.getMessageProperties().getHeaders().get("x-death");
if(xDeathList != null && xDeathList.size() > 0) {
currentCount = (Long)((Map)(xDeathList.get(0))).get("count");
}
if(currentCount < context.getRules().getNumberOfRetries()) {
//message sent to DLX
this.retryTemplate.send(handlerProperties.getSystem(), message);
} else {
//message ignored
}
throw new AmqpRejectAndDontRequeueException(cause);
}
}
您无法修改被拒绝的消息,它会原封不动地路由到 DLX/DLQ(除了 x-death
headers 是由代理添加的)。
如果您想更改邮件属性,您必须自己重新发布到 DLX/DLQ。
您可以使用 Spring Retry with a customized RepublishMessageRecoverer 来执行此操作。