在特定场景中死信队列和 TTL 循环后,如何将带有 Spring 的 AMQP 消息发布到停车场队列?
How to publish a AMQP message with Spring to a parking lot queue, after a loop in dead letter queue and TTL in a specific scenario?
我想在我的应用程序中实现以下场景:
- 如果出现业务错误,消息应该从incomingQueue发送到deadLetter Queue并在那里延迟10秒
- 步骤1应该重复3次
- 应该将消息发布到 parkingLot 队列
我能够(参见下面的代码)在死信队列中将消息延迟一定时间。并且消息在incoming Queue和deadLetter Queue之间无限循环。到目前为止一切顺利。
主要问题:如何拦截进程并手动将消息(如步骤 3 中所述)路由到 parkingLot 队列以供以后进一步分析?
次要问题:只用一次exchange能达到同样的流程吗?
这是我的两个 classes 的简化版本:
配置class
@Configuration
public class MailRabbitMQConfig {
@Bean
TopicExchange incomingExchange() {
TopicExchange incomingExchange = new TopicExchange(incomingExchangeName);
return incomingExchange;
}
@Bean
TopicExchange dlExchange() {
TopicExchange dlExchange = new TopicExchange(deadLetterExchangeName);
return dlExchange;
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(incomingQueueName)
.withArgument(
"x-dead-letter-exchange",
dlExchange().getName()
)
.build();
}
@Bean
public Queue parkingLotQueue() {
return new Queue(parkingLotQueueName);
}
@Bean
Binding incomingBinding() {
return BindingBuilder
.bind(incomingQueue())
.to(incomingExchange())
.with("#");
}
@Bean
public Queue dlQueue() {
return QueueBuilder
.durable(deadLetterQueueName)
.withArgument("x-message-ttl", 10000)
.withArgument("x-dead-letter-exchange", incomingExchange()
.getName())
.build();
}
@Bean
Binding dlBinding() {
return BindingBuilder
.bind(dlQueue())
.to(dlExchange())
.with("#");
}
@Bean
public Binding bindParkingLot(
Queue parkingLotQueue,
TopicExchange dlExchange
) {
return BindingBuilder.bind(parkingLotQueue)
.to(dlExchange)
.with(parkingLotRoutingKeyName);
}
}
消费者class
@Component
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
public Boolean receivedMessage(MailDataExternalTemplate mailDataExternalTemplate) throws Exception {
try {
// business logic here
} catch (Exception e) {
throw new AmqpRejectAndDontRequeueException("Failed to handle a business logic");
}
return Boolean.TRUE;
}
}
我知道我可以为消费者 class 中的死信队列定义一个额外的侦听器,如下所示:
@RabbitListener(queues = "${mail.rabbitmq.queue.deadletter}")
public void receivedMessageFromDlq(Message failedMessage) throws Exception {
// Logic to count x-retries header property value and send a failed message manually
// to the parkingLot Queue
}
然而它并没有像预期的那样工作,因为一旦消息到达死信队列的头部就会调用这个侦听器而不会被延迟。
提前致谢。
编辑:@ArtemBilan 和@GaryRussell 帮助我解决了这个问题。主要的解决方案提示在他们在接受的答案中的评论中。谢谢你们的帮助!您将在下面找到一个新图表,其中显示了消息传递过程以及配置和消费者 classes。主要变化是:
MailRabbitMQConfig
class. 中传入exchange->传入queue和dead letter exchange->dead letter queue之间的路由定义
- 在
Consumer
class 中手动将消息发布到停车场队列的循环处理
配置class
@Configuration
public class MailRabbitMQConfig {
@Autowired
public MailConfigurationProperties properties;
@Bean
TopicExchange incomingExchange() {
TopicExchange incomingExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getIncoming());
return incomingExchange;
}
@Bean
TopicExchange dlExchange() {
TopicExchange dlExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getDeadletter());
return dlExchange;
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(properties.getRabbitMQ().getQueue().getIncoming())
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
dlExchange().getName()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
properties.getRabbitMQ().getRoutingKey().getDeadLetter()
)
.build();
}
@Bean
public Queue parkingLotQueue() {
return new Queue(properties.getRabbitMQ().getQueue().getParkingLot());
}
@Bean
Binding incomingBinding() {
return BindingBuilder
.bind(incomingQueue())
.to(incomingExchange())
.with(properties.getRabbitMQ().getRoutingKey().getIncoming());
}
@Bean
public Queue dlQueue() {
return QueueBuilder
.durable(properties.getRabbitMQ().getQueue().getDeadLetter())
.withArgument(
properties.getRabbitMQ().getMessages().X_MESSAGE_TTL_HEADER,
properties.getRabbitMQ().getMessages().getDelayTime()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
incomingExchange().getName()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
properties.getRabbitMQ().getRoutingKey().getIncoming()
)
.build();
}
@Bean
Binding dlBinding() {
return BindingBuilder
.bind(dlQueue())
.to(dlExchange())
.with(properties.getRabbitMQ().getRoutingKey().getDeadLetter());
}
@Bean
public Binding bindParkingLot(
Queue parkingLotQueue,
TopicExchange dlExchange
) {
return BindingBuilder.bind(parkingLotQueue)
.to(dlExchange)
.with(properties.getRabbitMQ().getRoutingKey().getParkingLot());
}
}
消费者class
@Component
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@Autowired
public MailConfigurationProperties properties;
@Autowired
protected EmailClient mailJetEmailClient;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
public Boolean receivedMessage(
@Payload MailDataExternalTemplate mailDataExternalTemplate,
Message amqpMessage
) {
logger.info("Received message");
try {
final EmailTransportWrapper emailTransportWrapper = mailJetEmailClient.convertFrom(mailDataExternalTemplate);
mailJetEmailClient.sendEmailUsing(emailTransportWrapper);
logger.info("Successfully sent an E-Mail");
} catch (Exception e) {
int count = getXDeathCountFromHeader(amqpMessage);
logger.debug("x-death count: " + count);
if (count >= properties.getRabbitMQ().getMessages().getRetryCount()) {
this.rabbitTemplate.send(
properties.getRabbitMQ().getExchange().getDeadletter(),
properties.getRabbitMQ().getRoutingKey().getParkingLot(),
amqpMessage
);
return Boolean.TRUE;
}
throw new AmqpRejectAndDontRequeueException("Failed to send an E-Mail");
}
return Boolean.TRUE;
}
private int getXDeathCountFromHeader(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
if (headers.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER) == null) {
return 0;
}
//noinspection unchecked
ArrayList<Map<String, ?>> xDeath = (ArrayList<Map<String, ?>>) headers
.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER);
Long count = (Long) xDeath.get(0).get("count");
return count.intValue();
}
要延迟消息在队列中可用,您应该考虑使用 DelayedExchange
:https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/_reference.html#delayed-message-exchange.
至于手动发送到parkingLot
队列,这很容易使用RabbitTemplate
并使用其名称发送消息:
/**
* Send a message to a default exchange with a specific routing key.
*
* @param routingKey the routing key
* @param message a message to send
* @throws AmqpException if there is a problem
*/
void send(String routingKey, Message message) throws AmqpException;
所有队列都通过其名称作为路由键绑定到默认交换机。
我想在我的应用程序中实现以下场景:
- 如果出现业务错误,消息应该从incomingQueue发送到deadLetter Queue并在那里延迟10秒
- 步骤1应该重复3次
- 应该将消息发布到 parkingLot 队列
我能够(参见下面的代码)在死信队列中将消息延迟一定时间。并且消息在incoming Queue和deadLetter Queue之间无限循环。到目前为止一切顺利。
主要问题:如何拦截进程并手动将消息(如步骤 3 中所述)路由到 parkingLot 队列以供以后进一步分析?
次要问题:只用一次exchange能达到同样的流程吗?
这是我的两个 classes 的简化版本:
配置class
@Configuration
public class MailRabbitMQConfig {
@Bean
TopicExchange incomingExchange() {
TopicExchange incomingExchange = new TopicExchange(incomingExchangeName);
return incomingExchange;
}
@Bean
TopicExchange dlExchange() {
TopicExchange dlExchange = new TopicExchange(deadLetterExchangeName);
return dlExchange;
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(incomingQueueName)
.withArgument(
"x-dead-letter-exchange",
dlExchange().getName()
)
.build();
}
@Bean
public Queue parkingLotQueue() {
return new Queue(parkingLotQueueName);
}
@Bean
Binding incomingBinding() {
return BindingBuilder
.bind(incomingQueue())
.to(incomingExchange())
.with("#");
}
@Bean
public Queue dlQueue() {
return QueueBuilder
.durable(deadLetterQueueName)
.withArgument("x-message-ttl", 10000)
.withArgument("x-dead-letter-exchange", incomingExchange()
.getName())
.build();
}
@Bean
Binding dlBinding() {
return BindingBuilder
.bind(dlQueue())
.to(dlExchange())
.with("#");
}
@Bean
public Binding bindParkingLot(
Queue parkingLotQueue,
TopicExchange dlExchange
) {
return BindingBuilder.bind(parkingLotQueue)
.to(dlExchange)
.with(parkingLotRoutingKeyName);
}
}
消费者class
@Component
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
public Boolean receivedMessage(MailDataExternalTemplate mailDataExternalTemplate) throws Exception {
try {
// business logic here
} catch (Exception e) {
throw new AmqpRejectAndDontRequeueException("Failed to handle a business logic");
}
return Boolean.TRUE;
}
}
我知道我可以为消费者 class 中的死信队列定义一个额外的侦听器,如下所示:
@RabbitListener(queues = "${mail.rabbitmq.queue.deadletter}")
public void receivedMessageFromDlq(Message failedMessage) throws Exception {
// Logic to count x-retries header property value and send a failed message manually
// to the parkingLot Queue
}
然而它并没有像预期的那样工作,因为一旦消息到达死信队列的头部就会调用这个侦听器而不会被延迟。
提前致谢。
编辑:@ArtemBilan 和@GaryRussell 帮助我解决了这个问题。主要的解决方案提示在他们在接受的答案中的评论中。谢谢你们的帮助!您将在下面找到一个新图表,其中显示了消息传递过程以及配置和消费者 classes。主要变化是:
MailRabbitMQConfig
class. 中传入exchange->传入queue和dead letter exchange->dead letter queue之间的路由定义
- 在
Consumer
class 中手动将消息发布到停车场队列的循环处理
配置class
@Configuration
public class MailRabbitMQConfig {
@Autowired
public MailConfigurationProperties properties;
@Bean
TopicExchange incomingExchange() {
TopicExchange incomingExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getIncoming());
return incomingExchange;
}
@Bean
TopicExchange dlExchange() {
TopicExchange dlExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getDeadletter());
return dlExchange;
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(properties.getRabbitMQ().getQueue().getIncoming())
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
dlExchange().getName()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
properties.getRabbitMQ().getRoutingKey().getDeadLetter()
)
.build();
}
@Bean
public Queue parkingLotQueue() {
return new Queue(properties.getRabbitMQ().getQueue().getParkingLot());
}
@Bean
Binding incomingBinding() {
return BindingBuilder
.bind(incomingQueue())
.to(incomingExchange())
.with(properties.getRabbitMQ().getRoutingKey().getIncoming());
}
@Bean
public Queue dlQueue() {
return QueueBuilder
.durable(properties.getRabbitMQ().getQueue().getDeadLetter())
.withArgument(
properties.getRabbitMQ().getMessages().X_MESSAGE_TTL_HEADER,
properties.getRabbitMQ().getMessages().getDelayTime()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
incomingExchange().getName()
)
.withArgument(
properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
properties.getRabbitMQ().getRoutingKey().getIncoming()
)
.build();
}
@Bean
Binding dlBinding() {
return BindingBuilder
.bind(dlQueue())
.to(dlExchange())
.with(properties.getRabbitMQ().getRoutingKey().getDeadLetter());
}
@Bean
public Binding bindParkingLot(
Queue parkingLotQueue,
TopicExchange dlExchange
) {
return BindingBuilder.bind(parkingLotQueue)
.to(dlExchange)
.with(properties.getRabbitMQ().getRoutingKey().getParkingLot());
}
}
消费者class
@Component
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@Autowired
public MailConfigurationProperties properties;
@Autowired
protected EmailClient mailJetEmailClient;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
public Boolean receivedMessage(
@Payload MailDataExternalTemplate mailDataExternalTemplate,
Message amqpMessage
) {
logger.info("Received message");
try {
final EmailTransportWrapper emailTransportWrapper = mailJetEmailClient.convertFrom(mailDataExternalTemplate);
mailJetEmailClient.sendEmailUsing(emailTransportWrapper);
logger.info("Successfully sent an E-Mail");
} catch (Exception e) {
int count = getXDeathCountFromHeader(amqpMessage);
logger.debug("x-death count: " + count);
if (count >= properties.getRabbitMQ().getMessages().getRetryCount()) {
this.rabbitTemplate.send(
properties.getRabbitMQ().getExchange().getDeadletter(),
properties.getRabbitMQ().getRoutingKey().getParkingLot(),
amqpMessage
);
return Boolean.TRUE;
}
throw new AmqpRejectAndDontRequeueException("Failed to send an E-Mail");
}
return Boolean.TRUE;
}
private int getXDeathCountFromHeader(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
if (headers.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER) == null) {
return 0;
}
//noinspection unchecked
ArrayList<Map<String, ?>> xDeath = (ArrayList<Map<String, ?>>) headers
.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER);
Long count = (Long) xDeath.get(0).get("count");
return count.intValue();
}
要延迟消息在队列中可用,您应该考虑使用 DelayedExchange
:https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/_reference.html#delayed-message-exchange.
至于手动发送到parkingLot
队列,这很容易使用RabbitTemplate
并使用其名称发送消息:
/**
* Send a message to a default exchange with a specific routing key.
*
* @param routingKey the routing key
* @param message a message to send
* @throws AmqpException if there is a problem
*/
void send(String routingKey, Message message) throws AmqpException;
所有队列都通过其名称作为路由键绑定到默认交换机。