处理dead-letterqueuemessage-broker独立方式
Handling dead-letter queue message-broker independent way
我有一个项目目前在下面使用 Spring Cloud Streams 和 RabbitMQ。我实现了一个逻辑 based on the documentation。见下文:
@Component
public class ReRouteDlq {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
它按预期运行,但是,它绑定到 RabbitMQ,我的公司计划在一两年内停止使用此消息代理(不知道为什么,一定是一些疯狂的业务)。所以,我想实现同样的事情,但将它与任何消息代理分离。
我尝试用这种方式更改 rePublish
方法,但它不起作用:
@StreamListener(Sync.DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
它失败了,因为 Message
class 有不可变的 Headers - 在 put
尝试时抛出异常说你不能改变它的值(使用 org.springframework.messaging.Message
class).
有没有办法以独立于消息代理的方式实现此 dead-letter queue 处理程序?
使用
MessageBuilder.fromMessage(message)
.setHeader("foo", "bar")
...
.build();
请注意 @StreamListener
中的消息是 spring-messaging Message<?>
,而不是 spring-amqp Message
并且无法发送以这种方式使用模板;您需要一个输出绑定来将消息发送到。
我有一个项目目前在下面使用 Spring Cloud Streams 和 RabbitMQ。我实现了一个逻辑 based on the documentation。见下文:
@Component
public class ReRouteDlq {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
它按预期运行,但是,它绑定到 RabbitMQ,我的公司计划在一两年内停止使用此消息代理(不知道为什么,一定是一些疯狂的业务)。所以,我想实现同样的事情,但将它与任何消息代理分离。
我尝试用这种方式更改 rePublish
方法,但它不起作用:
@StreamListener(Sync.DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
它失败了,因为 Message
class 有不可变的 Headers - 在 put
尝试时抛出异常说你不能改变它的值(使用 org.springframework.messaging.Message
class).
有没有办法以独立于消息代理的方式实现此 dead-letter queue 处理程序?
使用
MessageBuilder.fromMessage(message)
.setHeader("foo", "bar")
...
.build();
请注意 @StreamListener
中的消息是 spring-messaging Message<?>
,而不是 spring-amqp Message
并且无法发送以这种方式使用模板;您需要一个输出绑定来将消息发送到。