如何限制我的 cron worker 表单将消息推送到 RabbitMQ?

How to throttle my cron worker form pushing messages to RabbitMQ?

上下文:

我们有消费(订阅)来自 50 多个 RabbitMQ 队列的消息的微服务。

在两个地方为这个队列生成消息

  1. 应用进程遇到短暂延迟执行的业务逻辑(如发送邮件或通知其他服务),应用直接将消息发送到exchange(再发送到队列)。

  2. 当我们遇到long/delayed执行业务逻辑时我们有messagestable,其中包含必须在一段时间后执行的消息条目。

现在我们有 cron worker,它 运行 每 10 分钟扫描一次 messages table 并将消息推送到 RabbitMQ。

场景:

假设消息 table 有 10,000 条消息将在下一个 cron 运行,

中排队
  1. 9.00 AM - Cron worker 运行s 并将 10,000 条消息排队到 RabbitMQ 队列。
  2. 我们确实有订阅者正在监听队列并开始使用消息,但由于系统中的某些问题或第 3 方响应时间延迟,每条消息都需要完成 1 Min
  3. 9.10 AM - 现在 cron worker 再一次 运行s 接下来的 10 分钟,看到还有 9000 多条消息尚未完成,时间也已经过去,所以它再次将 9000 多条重复消息推送到队列.

注意:消费消息的订阅者是幂等的,不会出现重复处理的问题

我有设计想法但不是最好的逻辑

我可以有 4 个状态(RequiresQueuing、Queued、Completed、Failed)

  1. 每当插入一条消息时,我都可以将状态设置为RequiresQueuing
  2. 接下来当 cron worker 成功选择消息并将其推送到队列时,我可以将其设置为 Queued
  3. 订阅者完成后将队列状态标记为 Completed / Failed

上述逻辑存在问题,假设 RabbitMQ 以某种方式关闭或者在某些使用中我们清除了队列以进行维护。

现在标记为Queued的消息处于错误状态,因为必须再次识别它们并且需要手动更改状态。

另一个例子

假设我有一个名为(事件)的 RabbitMQ 队列

这个事件队列有 5 个订阅者,每个订阅者从队列中获取 1 条消息,并且 post 这个事件使用 REST API 到另一个微服务(事件聚合器)。每个 API 调用通常需要 50 毫秒。

用例:

  1. 由于高负载,产生的事件数量变为 3 倍。
  2. 接受事件的微服务(事件聚合器)处理速度也变慢,响应时间从 50 毫秒增加到 1 分钟。
  3. Cron worker 遵循您上面提到的设计,每分钟将消息排队。现在队列变得太大了,但我也不能增加订阅者的数量,因为依赖的微服务(事件聚合器)也滞后。

现在的问题是,如果继续向事件队列发送消息,只会使队列膨胀。

https://www.rabbitmq.com/memory.html - 在阅读此页面时,我发现如果 rabbitmq 达到高水印分数(默认为 40%),它甚至不会接受连接。当然这可以改变,但这需要人工干预。

因此,如果队列长度增加,它会影响 rabbitmq 内存,这就是我想到在生产者级别进行节流的原因。

问题

  1. 我如何限制我的 cron worker 跳过那个特定的 运行 或以某种方式检查队列并确定它已经负载很重所以不要推送消息?
  2. 我如何处理上面提到的用例?有解决我问题的设计吗?有人遇到同样的问题吗?

提前致谢。

回答

检查已接受的答案关于使用 queueCount

进行节流的评论

您可以结合使用 QoS -(服务质量)和手动 ACK 来解决这个问题。 https://www.rabbitmq.com/tutorials/tutorial-two-python.html 中记录了您的确切场景。此示例适用于python,您也可以参考其他示例。

假设您有 1 个发布者和 5 个工作者脚本。让我们说这些是从同一个队列中读取的。每个工作脚本需要 1 分钟来处理一条消息。您可以在通道级别设置 QoS。如果将其设置为 1,那么在这种情况下,每个工作脚本将只分配 1 条消息。所以我们一次处理 5 条消息。在 5 个工作脚本之一执行 MANUAL ACK 之前,不会传递任何新消息。

如果要提高消息处理的吞吐量,可以增加工作节点数。

根据消息状态更新表的想法不是一个好的选择,数据库轮询是系统使用队列的主要原因,它会导致缩放问题。在某一时刻,您必须更新表,并且由于锁定和隔离级别,您会遇到瓶颈。