在 RabbitMQ 中处理死信

Dealing with dead letters in RabbitMQ

TL;DR:一旦我修复了最初导致消息被拒绝的消费者代码,我就需要 "replay" 将死信消息恢复到原来的 queues。

我已经为 RabbitMQ 配置了死信交换 (DLX),并且成功地将拒绝的消息路由到死信 queue。但现在我想查看死信 queue 中的消息,并尝试决定如何处理它们中的每一个。这些消息中的一些(很多?)应该被重播(重新queued)到他们原来的 queues(在 "x-death" headers 中可用)一旦有问题的消费者代码已经已修复。但是我实际上该怎么做呢?我是否应该编写一个 one-off 程序来从死信 queue 中读取消息并允许我指定目标 queue 以将它们发送到?那么搜索死信 queue 呢?如果我知道一条消息(假设它以 JSON 编码)具有我想要搜索和重播的特定属性怎么办?例如,我修复了一个缺陷,我知道该缺陷将允许 PacketId: 1234 的消息现在成功处理。我想我也可以为此编写一个 one-off 程序。

我当然不是第一个遇到这些问题的人,我想知道是否有其他人已经解决了这些问题。似乎应该有某种瑞士军刀来处理这种事情。我在 Google 和 Stack Overflow 上进行了相当广泛的搜索,但并没有真正找到太多。我能找到的最接近的东西是铲子,但这似乎并不是完成这项工作的正确工具。

Should I write a one-off program that reads messages from the dead letter queue and allows me to specify a target queue to send them to?

一般来说,是的。

您可以设置延迟 re-try 以将消息重新发送回原始 queue,使用 delay message exchange plugin.

的组合

但这只会在一定间隔内自动重试,您可能没有在重试发生之前解决问题。

在某些情况下这没问题 - 例如错误是由暂时不可用的外部资源引起的。

不过,就您而言,我相信您关于创建一个应用程序来处理死信的想法是最好的方法,原因有以下几个:

  • 您需要搜索消息,这是不可能的 RMQ
  • 这意味着您需要一个数据库来存储来自 DLX/queue
  • 的消息

因为您要从 DLX/queue 中提取消息,所以您需要确保从消息中获取所有 header 信息,以便您可以 re-publish到时候正确的queue。

I certainly can't be the first one to encounter these problems and I'm wondering if anyone else has already solved them.

而你不是!

这个问题有很多解决方案,都归结为您建议的解决方案。

一些较大的 "service bus" 实现内置了此类功能。例如,我相信 NServiceBus(或它的 SaaS 版本)内置了这个 - 尽管我不是 100% 确定它。

如果您想进一步研究这个问题,请搜索术语 "poison message" - 这通常是用于这种情况的术语。通过快速搜索,我在 google 上找到了一些东西,它们可能会帮助您继续前进:

希望对您有所帮助!