如何限制我的 cron worker 表单将消息推送到 RabbitMQ?
How to throttle my cron worker form pushing messages to RabbitMQ?
上下文:
我们有消费(订阅)来自 50 多个 RabbitMQ 队列的消息的微服务。
在两个地方为这个队列生成消息
应用进程遇到短暂延迟执行的业务逻辑(如发送邮件或通知其他服务),应用直接将消息发送到exchange(再发送到队列)。
当我们遇到long/delayed执行业务逻辑时我们有messages
table,其中包含必须在一段时间后执行的消息条目。
现在我们有 cron worker,它 运行 每 10 分钟扫描一次 messages
table 并将消息推送到 RabbitMQ。
场景:
假设消息 table 有 10,000 条消息将在下一个 cron 运行,
中排队
- 9.00 AM - Cron worker 运行s 并将 10,000 条消息排队到 RabbitMQ 队列。
- 我们确实有订阅者正在监听队列并开始使用消息,但由于系统中的某些问题或第 3 方响应时间延迟,每条消息都需要完成
1 Min
。
- 9.10 AM - 现在 cron worker 再一次 运行s 接下来的 10 分钟,看到还有 9000 多条消息尚未完成,时间也已经过去,所以它再次将 9000 多条重复消息推送到队列.
注意:消费消息的订阅者是幂等的,不会出现重复处理的问题
我有设计想法但不是最好的逻辑
我可以有 4 个状态(RequiresQueuing、Queued、Completed、Failed)
- 每当插入一条消息时,我都可以将状态设置为
RequiresQueuing
- 接下来当 cron worker 成功选择消息并将其推送到队列时,我可以将其设置为
Queued
- 订阅者完成后将队列状态标记为
Completed / Failed
。
上述逻辑存在问题,假设 RabbitMQ 以某种方式关闭或者在某些使用中我们清除了队列以进行维护。
现在标记为Queued
的消息处于错误状态,因为必须再次识别它们并且需要手动更改状态。
另一个例子
假设我有一个名为(事件)的 RabbitMQ 队列
这个事件队列有 5 个订阅者,每个订阅者从队列中获取 1 条消息,并且 post 这个事件使用 REST API 到另一个微服务(事件聚合器)。每个 API 调用通常需要 50 毫秒。
用例:
- 由于高负载,产生的事件数量变为 3 倍。
- 接受事件的微服务(事件聚合器)处理速度也变慢,响应时间从 50 毫秒增加到 1 分钟。
- Cron worker 遵循您上面提到的设计,每分钟将消息排队。现在队列变得太大了,但我也不能增加订阅者的数量,因为依赖的微服务(事件聚合器)也滞后。
现在的问题是,如果继续向事件队列发送消息,只会使队列膨胀。
https://www.rabbitmq.com/memory.html - 在阅读此页面时,我发现如果 rabbitmq 达到高水印分数(默认为 40%),它甚至不会接受连接。当然这可以改变,但这需要人工干预。
因此,如果队列长度增加,它会影响 rabbitmq 内存,这就是我想到在生产者级别进行节流的原因。
问题
- 我如何限制我的 cron worker 跳过那个特定的 运行 或以某种方式检查队列并确定它已经负载很重所以不要推送消息?
- 我如何处理上面提到的用例?有解决我问题的设计吗?有人遇到同样的问题吗?
提前致谢。
回答
检查已接受的答案关于使用 queueCount
进行节流的评论
您可以结合使用 QoS -(服务质量)和手动 ACK 来解决这个问题。
https://www.rabbitmq.com/tutorials/tutorial-two-python.html 中记录了您的确切场景。此示例适用于python,您也可以参考其他示例。
假设您有 1 个发布者和 5 个工作者脚本。让我们说这些是从同一个队列中读取的。每个工作脚本需要 1 分钟来处理一条消息。您可以在通道级别设置 QoS。如果将其设置为 1,那么在这种情况下,每个工作脚本将只分配 1 条消息。所以我们一次处理 5 条消息。在 5 个工作脚本之一执行 MANUAL ACK 之前,不会传递任何新消息。
如果要提高消息处理的吞吐量,可以增加工作节点数。
根据消息状态更新表的想法不是一个好的选择,数据库轮询是系统使用队列的主要原因,它会导致缩放问题。在某一时刻,您必须更新表,并且由于锁定和隔离级别,您会遇到瓶颈。
上下文:
我们有消费(订阅)来自 50 多个 RabbitMQ 队列的消息的微服务。
在两个地方为这个队列生成消息
应用进程遇到短暂延迟执行的业务逻辑(如发送邮件或通知其他服务),应用直接将消息发送到exchange(再发送到队列)。
当我们遇到long/delayed执行业务逻辑时我们有
messages
table,其中包含必须在一段时间后执行的消息条目。
现在我们有 cron worker,它 运行 每 10 分钟扫描一次 messages
table 并将消息推送到 RabbitMQ。
场景:
假设消息 table 有 10,000 条消息将在下一个 cron 运行,
中排队- 9.00 AM - Cron worker 运行s 并将 10,000 条消息排队到 RabbitMQ 队列。
- 我们确实有订阅者正在监听队列并开始使用消息,但由于系统中的某些问题或第 3 方响应时间延迟,每条消息都需要完成
1 Min
。 - 9.10 AM - 现在 cron worker 再一次 运行s 接下来的 10 分钟,看到还有 9000 多条消息尚未完成,时间也已经过去,所以它再次将 9000 多条重复消息推送到队列.
注意:消费消息的订阅者是幂等的,不会出现重复处理的问题
我有设计想法但不是最好的逻辑
我可以有 4 个状态(RequiresQueuing、Queued、Completed、Failed)
- 每当插入一条消息时,我都可以将状态设置为
RequiresQueuing
- 接下来当 cron worker 成功选择消息并将其推送到队列时,我可以将其设置为
Queued
- 订阅者完成后将队列状态标记为
Completed / Failed
。
上述逻辑存在问题,假设 RabbitMQ 以某种方式关闭或者在某些使用中我们清除了队列以进行维护。
现在标记为Queued
的消息处于错误状态,因为必须再次识别它们并且需要手动更改状态。
另一个例子
假设我有一个名为(事件)的 RabbitMQ 队列
这个事件队列有 5 个订阅者,每个订阅者从队列中获取 1 条消息,并且 post 这个事件使用 REST API 到另一个微服务(事件聚合器)。每个 API 调用通常需要 50 毫秒。
用例:
- 由于高负载,产生的事件数量变为 3 倍。
- 接受事件的微服务(事件聚合器)处理速度也变慢,响应时间从 50 毫秒增加到 1 分钟。
- Cron worker 遵循您上面提到的设计,每分钟将消息排队。现在队列变得太大了,但我也不能增加订阅者的数量,因为依赖的微服务(事件聚合器)也滞后。
现在的问题是,如果继续向事件队列发送消息,只会使队列膨胀。
https://www.rabbitmq.com/memory.html - 在阅读此页面时,我发现如果 rabbitmq 达到高水印分数(默认为 40%),它甚至不会接受连接。当然这可以改变,但这需要人工干预。
因此,如果队列长度增加,它会影响 rabbitmq 内存,这就是我想到在生产者级别进行节流的原因。
问题
- 我如何限制我的 cron worker 跳过那个特定的 运行 或以某种方式检查队列并确定它已经负载很重所以不要推送消息?
- 我如何处理上面提到的用例?有解决我问题的设计吗?有人遇到同样的问题吗?
提前致谢。
回答
检查已接受的答案关于使用 queueCount
进行节流的评论您可以结合使用 QoS -(服务质量)和手动 ACK 来解决这个问题。 https://www.rabbitmq.com/tutorials/tutorial-two-python.html 中记录了您的确切场景。此示例适用于python,您也可以参考其他示例。
假设您有 1 个发布者和 5 个工作者脚本。让我们说这些是从同一个队列中读取的。每个工作脚本需要 1 分钟来处理一条消息。您可以在通道级别设置 QoS。如果将其设置为 1,那么在这种情况下,每个工作脚本将只分配 1 条消息。所以我们一次处理 5 条消息。在 5 个工作脚本之一执行 MANUAL ACK 之前,不会传递任何新消息。
如果要提高消息处理的吞吐量,可以增加工作节点数。
根据消息状态更新表的想法不是一个好的选择,数据库轮询是系统使用队列的主要原因,它会导致缩放问题。在某一时刻,您必须更新表,并且由于锁定和隔离级别,您会遇到瓶颈。