RabbitMQ:从队列中为多个消费者发送固定消息

RabbitMQ : Dispatch fixed messages from Queue for multiple consumers

有没有一种方法可以限制 RabbitMQ Queue 从 Queue 向消费者发送固定数量的消息?

我有 2 个队列 Q1 和 Q2 以及 10 consumers.Every 消费者可以在任何给定时间处理来自 Q1 和 Q2.At 的消息,只有 2 个消费者应该处理来自 Q2.All 第 10 个的消息消费者可以同时处理来自 Q1 的消息。

RabbitMQ 中是否有任何我们可以指定的配置,以便 RabbitMQ 仅从 Q2 向任何空闲消费者推送 2 条消息,并且仅在它们被确认后才推送下 2 条消息,即使其他消费者空闲并准备消费.

有关此问题的更多背景信息:

为什么一次只处理 2 条消息? : Q2 消息正在进行 Web 服务调用,Web 服务端点(第三方)只能同时服务 2 条消息。

我们不能使用并发吗? : 如果我们使用 ListenerContainer (Spring AMQP),容器是每个消费者。我们可以限制一个消费者一次可以接收多少条消息,但是当我们有 10 个消费者时,如果队列中有消息,每个消费者都会得到它的份额。

我们可以只配置 2 个消费者收听 Q2 吗? : 我知道我们可以通过为 Q2 仅配置 2 个消费者来实现这一点,但我正在努力避免这种情况。如果由于某种原因这 2 个消费者宕机,Q2 的处理将停止。如果配置了 10 个消费者,我们可以保证处理一直进行到最后一个消费者宕机。

正在查看 RabbitMQ 中是否有一些我们可以使用的配置或任何建议的解决方案。

提前致谢!

我很确定 consumer prefetch 会完成您想要的。但是,Q2 只能有一个消费者才能工作。没有办法在多个消费者之间进行协调 - 你必须自己做,并且可以使用 RabbitMQ 来进行协调。


注意: RabbitMQ 团队监控 rabbitmq-users mailing list 并且有时只在 Whosebug 上回答问题。

我认为您已经完全沉浸在问题定义中了。你真正需要的是微不足道的,所以让我们把它分解一下。

给定两个队列,Q1Q2

  • 10 个消费者
  • 每个消费者都可以处理来自 Q1 和 Q2 的消息。
  • 在任何给定时间,只有 2 个消费者应该处理来自 Q2 的消息。
  • 所有 10 个消费者可以同时处理来自 Q1 的消息。

对问题陈述的评论

首先,假定队列是独立的。独立进程 P 将有队列 Q,因此 Q1 服务于进程 P1。这是一个严格的数学要求 - 您不能为单个进程定义两个队列 P.

因此,第二个约束在数学上是不正确的,原因与您无法编写一个有效函数来接受可互换的 stringbool 类型参数的原因相同。它必须接受一个或另一个,因为它们不是兼容的类型,或者它必须接受类型的单个共同祖先而不考虑子类型。这是 Liskov Substitution Principle.

的变体

重新定义问题

系统共有12个消费者:

  • Q1 有 10 个消费者
  • Q2 有 2 个消费者
  • [重要]消费者不在队列之间共享

Is there any configuration in RabbitMQ which we can specify, so that RabbitMQ pushes only 2 messages from Q2 to any free consumer and push the next 2 only after they are acknowledged, even though other consumers are free and ready to consume.

根据问题的新定义,你有两个选择:

  1. 使用 Basic.Get - 消费者处理完最后一条消息后立即从队列中拉出下一条消息。
  2. 使用限制为 1 的 consumer prefetch。这将立即为每个消费者发送第一条和第二条消息,然后在确认该消费者的下一条消息时一次发送一条消息。这有点复杂,但如果您的延迟裕度小于 10 毫秒,则可能有意义。

注意通过正确定义问题space,我们已经消除了试图弄清楚如何确保只有两个消费者在处理的根本问题Q2 随时留言。

试用 3.8+ 版本的新功能 Single Active Consumer。

Single active consumer allows to have only one consumer at a time consuming from a queue and to fail over to another registered consumer in case the active one is cancelled or dies. Consuming with only one consumer is useful when messages must be consumed and processed in the same order they arrive in the queue. Single active consumer can be enabled when declaring a queue, with the x-single-active-consumer argument set to true

https://www.rabbitmq.com/consumers.html#single-active-consumer

e.g. with the Java client: