延迟处理死信 queue
Handling dead letter queue with delay
我想执行以下操作:当消息失败并落入我的死信 queue 时,我想等待 5 分钟并在我的 queue.[=21] 上重新发布相同的消息=]
今天,使用 Spring Cloud Streams 和 RabbitMQ,我做了以下代码 Based on this documentation:
@Component
public class HandlerDlq {
private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_DELAY_HEADER = "x-delay";
private static final int NUMBER_OF_RETRIES = 3;
private static final int DELAY_MS = 300000;
private RabbitTemplate rabbitTemplate;
@Autowired
public HandlerDlq(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = MessageInputProcessor.DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = 0;
}
if (retriesHeader > NUMBER_OF_RETRIES) {
LOGGER.warn("Message {} added to failed messages queue", failedMessage);
this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
}
retriesHeader++;
headers.put(X_RETRIES_HEADER, retriesHeader);
headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
LOGGER.warn("Retrying message, {} attempts", retriesHeader);
this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
}
@Bean
public Queue parkingLot() {
return new Queue(MessageInputProcessor.FAILED);
}
}
我的MessageInputProcessor
界面:
public interface MessageInputProcessor {
String INPUT = "myInput";
String INPUT_DESTINATION = "myInput.group";
String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file
String FAILED = INPUT + "-failed";
String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";
@Input
SubscribableChannel storageManagerInput();
@Input(MessageInputProcessor.FAILED)
SubscribableChannel storageManagerFailed();
}
我的属性文件:
#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000
spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true
#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group
使用此代码,我可以从死信 queue 中读取,捕获 header 但我无法将其放回我的 queue(行 LOGGER.warn("Retrying message, {} attempts", retriesHeader);
只运行一次,即使我放的时候很慢)。
我的猜测是方法 bindOriginalToDelay
将交换绑定到一个新的 queue,而不是我的。但是,我没有找到一种方法让我的 queue 绑定到那里而不是创建一个新的。但我什至不确定这是不是错误。
我也试过发送到 MessageInputProcessor.INPUT
而不是 MessageInputProcessor.INPUT_DESTINATION
,但它没有按预期工作。
此外,不幸的是,由于对项目的依赖性,我无法更新 Spring 框架...
一段时间后,你能帮我把失败的消息放回我的 queue 吗?我真的不想在那里放 thread.sleep
...
使用该配置,myInput.group
绑定到具有路由键 #
的延迟(主题)交换 myInput
。
您可能应该删除 spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true
,因为您不需要延迟主交换。
它也将绑定到您的显式延迟交换,密钥为 myInput.group
。
在我看来一切都是正确的;您应该会看到绑定到两个交换器的相同(单个)队列:
myInput.group.dlq
绑定到 DLX
,密钥为 myInput.group
您应该设置更长的 TTL 并检查 DLQ 中的消息,看看是否有什么突出的地方。
编辑
我刚刚复制了你的代码,延迟了 5 秒,对我来说效果很好(关闭了主交易所的延迟)。
Retrying message, 4 attempts
和
added to failed messages queue
也许你认为它不起作用是因为你在主交易所也有延迟?
我想执行以下操作:当消息失败并落入我的死信 queue 时,我想等待 5 分钟并在我的 queue.[=21] 上重新发布相同的消息=]
今天,使用 Spring Cloud Streams 和 RabbitMQ,我做了以下代码 Based on this documentation:
@Component
public class HandlerDlq {
private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_DELAY_HEADER = "x-delay";
private static final int NUMBER_OF_RETRIES = 3;
private static final int DELAY_MS = 300000;
private RabbitTemplate rabbitTemplate;
@Autowired
public HandlerDlq(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = MessageInputProcessor.DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = 0;
}
if (retriesHeader > NUMBER_OF_RETRIES) {
LOGGER.warn("Message {} added to failed messages queue", failedMessage);
this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
}
retriesHeader++;
headers.put(X_RETRIES_HEADER, retriesHeader);
headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
LOGGER.warn("Retrying message, {} attempts", retriesHeader);
this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
}
@Bean
public Queue parkingLot() {
return new Queue(MessageInputProcessor.FAILED);
}
}
我的MessageInputProcessor
界面:
public interface MessageInputProcessor {
String INPUT = "myInput";
String INPUT_DESTINATION = "myInput.group";
String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file
String FAILED = INPUT + "-failed";
String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";
@Input
SubscribableChannel storageManagerInput();
@Input(MessageInputProcessor.FAILED)
SubscribableChannel storageManagerFailed();
}
我的属性文件:
#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000
spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true
#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group
使用此代码,我可以从死信 queue 中读取,捕获 header 但我无法将其放回我的 queue(行 LOGGER.warn("Retrying message, {} attempts", retriesHeader);
只运行一次,即使我放的时候很慢)。
我的猜测是方法 bindOriginalToDelay
将交换绑定到一个新的 queue,而不是我的。但是,我没有找到一种方法让我的 queue 绑定到那里而不是创建一个新的。但我什至不确定这是不是错误。
我也试过发送到 MessageInputProcessor.INPUT
而不是 MessageInputProcessor.INPUT_DESTINATION
,但它没有按预期工作。
此外,不幸的是,由于对项目的依赖性,我无法更新 Spring 框架...
一段时间后,你能帮我把失败的消息放回我的 queue 吗?我真的不想在那里放 thread.sleep
...
使用该配置,myInput.group
绑定到具有路由键 #
的延迟(主题)交换 myInput
。
您可能应该删除 spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true
,因为您不需要延迟主交换。
它也将绑定到您的显式延迟交换,密钥为 myInput.group
。
在我看来一切都是正确的;您应该会看到绑定到两个交换器的相同(单个)队列:
myInput.group.dlq
绑定到 DLX
,密钥为 myInput.group
您应该设置更长的 TTL 并检查 DLQ 中的消息,看看是否有什么突出的地方。
编辑
我刚刚复制了你的代码,延迟了 5 秒,对我来说效果很好(关闭了主交易所的延迟)。
Retrying message, 4 attempts
和
added to failed messages queue
也许你认为它不起作用是因为你在主交易所也有延迟?