扇出交换上的 RabbitMQ 重试逻辑

RabbitMQ retry logic on fanout exchange

当前系统围绕 RabbitMQ

我们有一组队列和交换来支持一条消息。

  1. Main exchange:这是从消息生产者那里接收消息并进行第一次处理的地方。这可以是主题或扇出(当前问题是关于扇出)
  2. 主队列:这是消费者从中挑选消息进行处理的队列。
  3. 死交换和队列:坏消息的简单基本设置。
  4. 延迟队列:如果需要重试,这是从消费者那里获取消息的队列。此队列中的消息具有特定的 ttl,此队列的死交换是“主交换”。没有人听这个队列,消息只是等待过期并移动到“主交换”。
  5. Error Queue: 重试后仍无法处理的消息放这里。同样,目前没有人听他们的。

上述设置的问题场景:

假设我有一个扇出“foo-exchange”,它将消息发送到队列“bar”和“baz”。假设有一条消息传入,它是有效的,“bar”成功处理了它,但“baz”由于某种原因失败了(可能是外部服务已关闭),我们想在 5 分钟后重试。来自“baz”的消息被发送回“foo-exchange”(通过延迟队列),然后它不仅将其发送回“baz”,还发送回“bar”。

当前实施的解决方案(我们想要比这更好的解决方案!)

我们每个队列都有一个交换器,用于在重试超时期限后将重试的消息发送回特定队列。

在这种情况下,我们有 3 个交换(“foo-exchange”、“foo-exchange-dead”、“Baz-exchange-retry(每个队列 1 个到扇出交换)”和 3 个队列(“baz-队列”、“baz-queue-delay”、“baz-queue-error”和整个交换的 1 个死队列(“foo-queue-dead”)。

此设置适用于 1 个队列进行扇出,并且会显着增加扇出交换具有多个消费者队列的情况。

所以,我们需要一个解决方案,可以将这个复杂的设置减少到一些可管理的队列和交换中

我们已经调查过的事情:

  1. x-delay-exchange:这对我们来说不是一个好的解决方案,因为这并不能说明有多少消息正在等待再次处理。我们需要知道在发生外部故障时要重试多少次。 (https://github.com/rabbitmq/rabbitmq-delayed-message-exchange)
  2. Message ttl to main queue: 这会阻止延迟消息后面的所有消息。

其中一个选项是在将消息放入延迟队列之前用目标详细信息装饰消息,然后当消息等待时间到期时,消息被解包以获得目标和原始消息并发送到目标。

每个队列也绑定到默认交换器 (""),路由键等于队列名称。

您可以简单地将过期的死信直接路由到原始队列。