在 RabbitMQ 中处理死信
Dealing with dead letters in RabbitMQ
TL;DR:一旦我修复了最初导致消息被拒绝的消费者代码,我就需要 "replay" 将死信消息恢复到原来的 queues。
我已经为 RabbitMQ 配置了死信交换 (DLX),并且成功地将拒绝的消息路由到死信 queue。但现在我想查看死信 queue 中的消息,并尝试决定如何处理它们中的每一个。这些消息中的一些(很多?)应该被重播(重新queued)到他们原来的 queues(在 "x-death" headers 中可用)一旦有问题的消费者代码已经已修复。但是我实际上该怎么做呢?我是否应该编写一个 one-off 程序来从死信 queue 中读取消息并允许我指定目标 queue 以将它们发送到?那么搜索死信 queue 呢?如果我知道一条消息(假设它以 JSON 编码)具有我想要搜索和重播的特定属性怎么办?例如,我修复了一个缺陷,我知道该缺陷将允许 PacketId: 1234 的消息现在成功处理。我想我也可以为此编写一个 one-off 程序。
我当然不是第一个遇到这些问题的人,我想知道是否有其他人已经解决了这些问题。似乎应该有某种瑞士军刀来处理这种事情。我在 Google 和 Stack Overflow 上进行了相当广泛的搜索,但并没有真正找到太多。我能找到的最接近的东西是铲子,但这似乎并不是完成这项工作的正确工具。
Should I write a one-off program that reads messages from the dead letter queue and allows me to specify a target queue to send them to?
一般来说,是的。
您可以设置延迟 re-try 以将消息重新发送回原始 queue,使用 delay message exchange plugin.
的组合
但这只会在一定间隔内自动重试,您可能没有在重试发生之前解决问题。
在某些情况下这没问题 - 例如错误是由暂时不可用的外部资源引起的。
不过,就您而言,我相信您关于创建一个应用程序来处理死信的想法是最好的方法,原因有以下几个:
- 您需要搜索消息,这是不可能的 RMQ
- 这意味着您需要一个数据库来存储来自 DLX/queue
的消息
因为您要从 DLX/queue 中提取消息,所以您需要确保从消息中获取所有 header 信息,以便您可以 re-publish到时候正确的queue。
I certainly can't be the first one to encounter these problems and I'm wondering if anyone else has already solved them.
而你不是!
这个问题有很多解决方案,都归结为您建议的解决方案。
一些较大的 "service bus" 实现内置了此类功能。例如,我相信 NServiceBus(或它的 SaaS 版本)内置了这个 - 尽管我不是 100% 确定它。
如果您想进一步研究这个问题,请搜索术语 "poison message" - 这通常是用于这种情况的术语。通过快速搜索,我在 google 上找到了一些东西,它们可能会帮助您继续前进:
- http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2013-January/025019.html
- https://web.archive.org/web/20170809194056/http://tafakari.co.ke/2014/07/rabbitmq-poison-messages/
- https://web.archive.org/web/20170809170555/http://kjnilsson.github.io/blog/2014/01/30/spread-the-poison/
希望对您有所帮助!
TL;DR:一旦我修复了最初导致消息被拒绝的消费者代码,我就需要 "replay" 将死信消息恢复到原来的 queues。
我已经为 RabbitMQ 配置了死信交换 (DLX),并且成功地将拒绝的消息路由到死信 queue。但现在我想查看死信 queue 中的消息,并尝试决定如何处理它们中的每一个。这些消息中的一些(很多?)应该被重播(重新queued)到他们原来的 queues(在 "x-death" headers 中可用)一旦有问题的消费者代码已经已修复。但是我实际上该怎么做呢?我是否应该编写一个 one-off 程序来从死信 queue 中读取消息并允许我指定目标 queue 以将它们发送到?那么搜索死信 queue 呢?如果我知道一条消息(假设它以 JSON 编码)具有我想要搜索和重播的特定属性怎么办?例如,我修复了一个缺陷,我知道该缺陷将允许 PacketId: 1234 的消息现在成功处理。我想我也可以为此编写一个 one-off 程序。
我当然不是第一个遇到这些问题的人,我想知道是否有其他人已经解决了这些问题。似乎应该有某种瑞士军刀来处理这种事情。我在 Google 和 Stack Overflow 上进行了相当广泛的搜索,但并没有真正找到太多。我能找到的最接近的东西是铲子,但这似乎并不是完成这项工作的正确工具。
Should I write a one-off program that reads messages from the dead letter queue and allows me to specify a target queue to send them to?
一般来说,是的。
您可以设置延迟 re-try 以将消息重新发送回原始 queue,使用 delay message exchange plugin.
的组合但这只会在一定间隔内自动重试,您可能没有在重试发生之前解决问题。
在某些情况下这没问题 - 例如错误是由暂时不可用的外部资源引起的。
不过,就您而言,我相信您关于创建一个应用程序来处理死信的想法是最好的方法,原因有以下几个:
- 您需要搜索消息,这是不可能的 RMQ
- 这意味着您需要一个数据库来存储来自 DLX/queue 的消息
因为您要从 DLX/queue 中提取消息,所以您需要确保从消息中获取所有 header 信息,以便您可以 re-publish到时候正确的queue。
I certainly can't be the first one to encounter these problems and I'm wondering if anyone else has already solved them.
而你不是!
这个问题有很多解决方案,都归结为您建议的解决方案。
一些较大的 "service bus" 实现内置了此类功能。例如,我相信 NServiceBus(或它的 SaaS 版本)内置了这个 - 尽管我不是 100% 确定它。
如果您想进一步研究这个问题,请搜索术语 "poison message" - 这通常是用于这种情况的术语。通过快速搜索,我在 google 上找到了一些东西,它们可能会帮助您继续前进:
- http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2013-January/025019.html
- https://web.archive.org/web/20170809194056/http://tafakari.co.ke/2014/07/rabbitmq-poison-messages/
- https://web.archive.org/web/20170809170555/http://kjnilsson.github.io/blog/2014/01/30/spread-the-poison/
希望对您有所帮助!