如何在 RabbitMQ Exchange 中将旧消息排入新队列?

How to enqueue old messages into a new queue in RabbitMQ Exchange?

我有一个 topic 类型的交换,它只将消息重定向到队列 payments

以后的某个时候,我会决定再添加一个队列payment_analyze来分析所有已经入队的新旧消息。

持久交换和队列在 rabbit MQ 重启后仍然存在,持久消息被写入磁盘但是当将新队列绑定到旧的持久交换时,旧消息不会被重定向(只有新消息会被重定向)

根据我的理解,这是预期的行为,因为交换不存储消息而仅充当“代理”

我该如何实现?

可能的解决方案

创建一个名为 parking 的队列并将每条排队的消息添加到其中,每当添加新队列时,使用来自 parking 的消息而不确认以保持新队列“半”最新.

即使您在 payments 队列上配置了持久消息,这仅意味着消息将在代理重启后继续存在 - 一旦消息被使用并确认,它将被删除。

如果您知道在将来的某个时候您将需要 payment_analyze 队列,是否可以仅预先创建此 queue/binding 并将消息路由到两个 payment_analyzepaymentspayment_analyze 上的消息会堆积起来,直到您准备好开始使用它们。注意:如果您要生成大量消息,此方法可能会导致存储问题...

作为替代方案,您可以将消息写入 BLOB 存储(或其他一些数据存储)作为 payments 队列消费者(或完全不同的 queue/consumer)的一部分,然后当您准备好引入 payment_analyze 队列,您可以编写一个脚本来从 BLOB 存储中读取所有旧消息并将它们发送到 RabbitMQ 交换器。使用 'topic' 交换 - 请参阅 here - 您可能会在队列绑定中使用通配符和路由键来确保旧消息(来自 BLOB 存储)和新消息都被路由到 payment_analyze 队列,但只有新消息被路由到 payments 队列(这样您的 payments 队列消费者就不会重新处理旧消息)。

另一种选择(假设您没有过度投资于 RabbitMQ)可能是考虑使用 Apache Kafka,它可以很好地处理这种情况,因为一旦消息被订户。

无论如何,只需要考虑几个选项...