AMQP/RabbitMQ - 按顺序处理消息

AMQP/RabbitMQ - Process messages sequentially

我有一个 直接 交易所。还有一个队列,绑定到这个交换器。

我有两个消费者用于该队列。消费者在完成相应的处理后手动确认消息。

消息在逻辑上是 ordered/sorted,应按该顺序处理。是否可以强制所有消息在消费者 A 和消费者 B 之间按顺序接收和处理?也就是说,防止A和B同时处理消息。

注意:消费者共享同一个连接and/or频道。这意味着我不能使用 <channel>.basicQoS(1);.

这个问题的基本原理:两个消费者是相同的。如果一个出现故障,另一个队列将开始处理消息,一切都会继续工作,无需任何干预。

通常 MQ 系统的重点是分配工作负载。当然,在某些情况下,消息N的处理取决于消息N-1的处理结果,甚至是N-1消息本身。

如果 A 和 B 不能同时处理消息,那为什么不只有 A 或者只有 B?正如我所见,拥有 2 个消费者并没有节省任何东西,只有当另一个消费者不工作时,一个消费者才能工作...

在你的情况下,最好有一个消费者,但实际上在处理部分进行并行化(不是一个词)。

只是补充一点,RMQ 会根据任何标准将消息均匀地分发给所有消费者(以循环方式)。当然,这是在 prefetch 设置为 1 时,默认情况下是这样。有关 here 的更多信息,请查找 "fair dispatch"。

在您想要冗余消费者但需要按特定顺序处理消息的情况下,一种处理故障转移的方法是在设置绑定到队列时使用独占消费者选项,并让两个消费者保持即使他们无法获得独占锁也试图绑定。

过程是这样的:

  1. 消费者A先启动,作为独占消费者绑定到队列。消费者 A 开始处理队列中的消息。
  2. Consumer B 接下来开始并尝试作为独占消费者绑定到队列,但被拒绝,因为队列已经有一个独占消费者。
  3. 消费者 B 反复尝试在队列上获得独占绑定但被拒绝。
  4. 托管消费者 A 的进程崩溃。
  5. 消费者B尝试以独占消费者的身份绑定队列,这次成功了。消费者 B 开始处理队列中的消息。
  6. 消费者 A 重新联机,并尝试独占绑定,但现在被拒绝了。
  7. 消费者 B 继续以 FIFO 顺序处理消息。

虽然这种方法不提供负载共享,但它确实提供了冗余。

即使已经回答了这个问题。可能这可以帮助其他人。 ActiveMQ 有一个称为 Single Active Consumer 的功能,它符合您的情况。

我们可以将 N 个消费者附加到队列,但其中只有 1(一)个消费者会主动使用队列中的消息。仅当活动消费者发生故障时才会发生故障转移。

请看一下 link https://www.rabbitmq.com/consumers.html#single-active-consumer

谢谢