RabbitMQ 跨多个队列的多个消费者 - 消息被延迟处理
RabbitMQ multiple consumers across multiple queues - messages delayed from being processed
我们最近遇到了由 RabbitMQ 提供支持的应用程序的意外行为。
RabbitMQ 版本是 3.6.12,我们使用的是 .NET Client 5.0.1
应用程序订阅了两个队列,一个用于命令,另一个用于事件——我们也使用手动确认。
我们的应用程序配置为有 7 个消费者。每个都有自己的通道(IModel),每个都有自己的 EventingBasicConsumer
我们最终在 EventingBasicConsumer.Received 被触发时处理消息。
我们的应用程序必须在消息被路由到队列时尽可能接近地处理消息,到目前为止我们还没有遇到问题。
然而最近,我们发现当我们正在处理的一条消息需要很长时间才能完成时,它会延迟处理另一条消息,尽管有许多可用的消费者 (6) 并不忙。
请注意,我们观察到当应用程序仅订阅单个队列时不会发生此问题,当涉及多个队列时它会成为问题。
使用以下示例可以很好地说明这一点:
我们有一个订阅两个队列的简单消费应用程序,
一种用于命令,一种用于事件。这个应用程序有 7
消费者,每个人都有自己的渠道和 EventingBasicConsumer 我们
启动一个简单的发布应用程序,发布 20 条消息,一个
第二分开。每条消息都是一个事件,因此会发布到该事件
队列除了第 5 条和第 10 条消息,它们是命令和
发送到命令队列。请注意,每个事件的处理都没有
延迟而命令需要 30 秒
以下 table 描述了我们观察到的关于为跨多个队列的消息分配多个通道的内容:
一旦 Message5 在 30 秒后与 C1 完成,Messaqe9 将立即分配给 C1 并立即处理
一旦 Message10 在 30 秒后与 C2 一起完成,Messaqe11 将立即分配给 C2 并立即进行处理
因此,对我们来说,通道的分配似乎是每个队列独立完成的 - 这意味着如果某些消息需要很长时间才能处理,您可以延迟执行。
是否可以在多个消费者订阅多个队列时,即使当前有消费者处于空闲状态,RabbitMQ 也可以分配一条消息由忙碌的消费者处理?
是否有任何文档解释 RabbitMQ 算法,该算法从消费者集合中选择 EventingBasicConsumer.received 触发哪些消费者?
我们已经解决了这个问题。
在 RMQ 文档 (https://www.rabbitmq.com/api-guide.html#consuming) 中,我们遇到了以下内容:
“每个通道都有自己的调度线程。对于每个通道一个消费者的最常见用例,这意味着消费者不会阻止其他消费者。如果每个通道有多个消费者,请注意 long-running 消费者可能持有向该频道上的其他消费者发送回调。”
在我们的代码中,每个通道有 2 个消费者,这意味着消费者可以阻止其他消费者。
我们更改为每个频道有一个消费者并解决了这个问题。
我们最近遇到了由 RabbitMQ 提供支持的应用程序的意外行为。 RabbitMQ 版本是 3.6.12,我们使用的是 .NET Client 5.0.1
应用程序订阅了两个队列,一个用于命令,另一个用于事件——我们也使用手动确认。 我们的应用程序配置为有 7 个消费者。每个都有自己的通道(IModel),每个都有自己的 EventingBasicConsumer 我们最终在 EventingBasicConsumer.Received 被触发时处理消息。
我们的应用程序必须在消息被路由到队列时尽可能接近地处理消息,到目前为止我们还没有遇到问题。 然而最近,我们发现当我们正在处理的一条消息需要很长时间才能完成时,它会延迟处理另一条消息,尽管有许多可用的消费者 (6) 并不忙。
请注意,我们观察到当应用程序仅订阅单个队列时不会发生此问题,当涉及多个队列时它会成为问题。
使用以下示例可以很好地说明这一点:
我们有一个订阅两个队列的简单消费应用程序, 一种用于命令,一种用于事件。这个应用程序有 7 消费者,每个人都有自己的渠道和 EventingBasicConsumer 我们 启动一个简单的发布应用程序,发布 20 条消息,一个 第二分开。每条消息都是一个事件,因此会发布到该事件 队列除了第 5 条和第 10 条消息,它们是命令和 发送到命令队列。请注意,每个事件的处理都没有 延迟而命令需要 30 秒
以下 table 描述了我们观察到的关于为跨多个队列的消息分配多个通道的内容:
一旦 Message5 在 30 秒后与 C1 完成,Messaqe9 将立即分配给 C1 并立即处理 一旦 Message10 在 30 秒后与 C2 一起完成,Messaqe11 将立即分配给 C2 并立即进行处理
因此,对我们来说,通道的分配似乎是每个队列独立完成的 - 这意味着如果某些消息需要很长时间才能处理,您可以延迟执行。
是否可以在多个消费者订阅多个队列时,即使当前有消费者处于空闲状态,RabbitMQ 也可以分配一条消息由忙碌的消费者处理?
是否有任何文档解释 RabbitMQ 算法,该算法从消费者集合中选择 EventingBasicConsumer.received 触发哪些消费者?
我们已经解决了这个问题。
在 RMQ 文档 (https://www.rabbitmq.com/api-guide.html#consuming) 中,我们遇到了以下内容: “每个通道都有自己的调度线程。对于每个通道一个消费者的最常见用例,这意味着消费者不会阻止其他消费者。如果每个通道有多个消费者,请注意 long-running 消费者可能持有向该频道上的其他消费者发送回调。”
在我们的代码中,每个通道有 2 个消费者,这意味着消费者可以阻止其他消费者。 我们更改为每个频道有一个消费者并解决了这个问题。