Spring amqp 拒绝监听器外的消息
Spring amqp reject message outside a listener
该应用程序使用 java10、spring amqp 和 rabbitmq。
系统有一个死信 queue 我们发送了一些消息(由于数据库不可用,它们无法按预期处理)。
目前,数据库可用性每 X 秒检查一次,如果仅可用,我们将 re-queue 消息发送到其原始 queue。否则我们什么都不做,消息会保留在死信中 queue.
当 re-queued 到原始 queue 时,消息可以再次返回到死信 queue 并看到 x-death header 计数增长。
出于某些原因,我们希望处理 dead-lettered 个计数 >= 5(例如)的消息,并将其他 re-queue 个消息处理为死信 queue。
我需要先基本确认消息以检查 x-death 计数 header,如果计数足够大,然后将其发送到原始 queue,否则 re-queue 在死信中 queue.
我无法 re-queue 变成死信 queue 因为基本进入不在侦听器内部:抛出 AmqpRejectAndDontRequeueException 不起作用,因为异常是没有被扔进 rabbitmq 侦听器 object.
我尝试在 receiveAndCallback 方法中抛出异常,但这似乎并不好:
rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {
@Override
public Object handle(Message message) {
Long messageXdeathCount = null;
if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
"x-death");
if (null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
resendsMessage(message);
} else {
// this does not reject the message
throw new AmqpRejectAndDontRequeueException("rejected");
}
return null;
}
});
return receive;
}
执行此方法后,消息没有像我预期的那样被拒绝,并且远离queue(已被确认)。
这里是兑换和queue声明:
@Bean
public Exchange exchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
admin().declareExchange(exchange);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
Queue queue = new Queue("queueName", true, false, false, args);
admin().declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
admin().declareBinding(binding);
return exchange;
}
如何在不使用 AmqpRejectAndDontRequeueException 的情况下拒绝死信 queue 中的消息?
交易所是否可以将 x-dead-letter-exchange 设置为 self?
感谢您的帮助
更新
我尝试了另一种方式,使用通道获取和拒绝:
// exchange creation
@Bean
public Exchange exchange() throws IOException {
Connection connection = connectionFactory().createConnection();
Channel channel = channel();
channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
channel.queueDeclare("queueName", true, false, false, args);
channel.queueBind("queueName", EXCHANGE, routingKey);
return exchange;
}
消息获取并确认或拒绝:
GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = null;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
if(null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
MessageProperties messageProps =
messagePropertiesConverter.toMessageProperties(response.getProps(),
response.getEnvelope(), "UTF-8");
resendsMessage(new Message(response.getBody(), messageProps));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
if(response.getProps().getHeaders().get("x-death") == null) {
response.getProps().getHeaders().put("x-death", new ArrayList<>());
}
if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
}
((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
"count", messageXdeathCount + 1);
channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
}
首先我意识到它很丑陋,然后消息不能在get和rejected之间更新。有一种方法可以使用 channel.basicReject 并更新 x-death 计数 header?
receiveAndReply()
方法目前不提供对收到消息的确认的控制。随意开一个New Feature Request.
您可以改用侦听器容器来获得所需的灵活性。
编辑
你可以下拉到rabbitmq API...
rabbitTemplate.execute(channel -> {
// basicGet, basicPublish, ack/nack etc here
});
我可以使用频道的基本方法:
GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = 0L;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
if(null != xdeathHeader && null != xdeathHeader.get(0)) {
for (Map<String, ?> map : xdeathHeader) {
Long count = (Long) map.get("count");
messageXdeathCount += count == null ? 0L : count;
}
}
}
if (messageXdeathCount >= 5) {
MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
resendsMessage(new Message(response.getBody(), messageProps));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
}
我问题更新部分的问题在最后一行:
channel.basicGet(queueName, true);
布尔值指示消息是否应重新排队:如果不重新排队,它将转到交换信件并按预期递增计数 x-death header。布尔值更新为 false 解决了这个问题。
该应用程序使用 java10、spring amqp 和 rabbitmq。
系统有一个死信 queue 我们发送了一些消息(由于数据库不可用,它们无法按预期处理)。
目前,数据库可用性每 X 秒检查一次,如果仅可用,我们将 re-queue 消息发送到其原始 queue。否则我们什么都不做,消息会保留在死信中 queue.
当 re-queued 到原始 queue 时,消息可以再次返回到死信 queue 并看到 x-death header 计数增长。
出于某些原因,我们希望处理 dead-lettered 个计数 >= 5(例如)的消息,并将其他 re-queue 个消息处理为死信 queue。
我需要先基本确认消息以检查 x-death 计数 header,如果计数足够大,然后将其发送到原始 queue,否则 re-queue 在死信中 queue.
我无法 re-queue 变成死信 queue 因为基本进入不在侦听器内部:抛出 AmqpRejectAndDontRequeueException 不起作用,因为异常是没有被扔进 rabbitmq 侦听器 object.
我尝试在 receiveAndCallback 方法中抛出异常,但这似乎并不好:
rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {
@Override
public Object handle(Message message) {
Long messageXdeathCount = null;
if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
"x-death");
if (null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
resendsMessage(message);
} else {
// this does not reject the message
throw new AmqpRejectAndDontRequeueException("rejected");
}
return null;
}
});
return receive;
}
执行此方法后,消息没有像我预期的那样被拒绝,并且远离queue(已被确认)。
这里是兑换和queue声明:
@Bean
public Exchange exchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
admin().declareExchange(exchange);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
Queue queue = new Queue("queueName", true, false, false, args);
admin().declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
admin().declareBinding(binding);
return exchange;
}
如何在不使用 AmqpRejectAndDontRequeueException 的情况下拒绝死信 queue 中的消息? 交易所是否可以将 x-dead-letter-exchange 设置为 self?
感谢您的帮助
更新
我尝试了另一种方式,使用通道获取和拒绝:
// exchange creation
@Bean
public Exchange exchange() throws IOException {
Connection connection = connectionFactory().createConnection();
Channel channel = channel();
channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
channel.queueDeclare("queueName", true, false, false, args);
channel.queueBind("queueName", EXCHANGE, routingKey);
return exchange;
}
消息获取并确认或拒绝:
GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = null;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
if(null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
MessageProperties messageProps =
messagePropertiesConverter.toMessageProperties(response.getProps(),
response.getEnvelope(), "UTF-8");
resendsMessage(new Message(response.getBody(), messageProps));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
if(response.getProps().getHeaders().get("x-death") == null) {
response.getProps().getHeaders().put("x-death", new ArrayList<>());
}
if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
}
((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
"count", messageXdeathCount + 1);
channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
}
首先我意识到它很丑陋,然后消息不能在get和rejected之间更新。有一种方法可以使用 channel.basicReject 并更新 x-death 计数 header?
receiveAndReply()
方法目前不提供对收到消息的确认的控制。随意开一个New Feature Request.
您可以改用侦听器容器来获得所需的灵活性。
编辑
你可以下拉到rabbitmq API...
rabbitTemplate.execute(channel -> {
// basicGet, basicPublish, ack/nack etc here
});
我可以使用频道的基本方法:
GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = 0L;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
if(null != xdeathHeader && null != xdeathHeader.get(0)) {
for (Map<String, ?> map : xdeathHeader) {
Long count = (Long) map.get("count");
messageXdeathCount += count == null ? 0L : count;
}
}
}
if (messageXdeathCount >= 5) {
MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
resendsMessage(new Message(response.getBody(), messageProps));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
}
我问题更新部分的问题在最后一行:
channel.basicGet(queueName, true);
布尔值指示消息是否应重新排队:如果不重新排队,它将转到交换信件并按预期递增计数 x-death header。布尔值更新为 false 解决了这个问题。