队列公平性和消息服务器

Queue Fairness and Messaging Servers

我正在寻找解决消息服务器和队列的 FIFO 性质的问题。在某些情况下,我想根据传递消息的顺序以外的标准将队列中的消息分发给消费者池。理想情况下,这将防止用户占用系统中的共享资源。以这个过于简化的场景为例:

鉴于每个用户的垃圾桶中都可能有大量邮件,我们有什么选择可以在不考虑排队时间的情况下允许并发处理每个垃圾桶?在我看来,有一些明显的解决方案:

在我们的例子中,创建一个单独的队列并为每个用户管理消费者确实不切实际。可以做到,但我认为如果合理的话,我真的更喜欢第二种选择。我们正在使用 RabbitMQ,但如果有更适合此任务的技术,则不一定与它绑定。

我很喜欢使用 Rabbit 的消息优先级来帮助随机传递的想法。通过随机分配一条消息的优先级在 1 到 10 之间,这应该有助于分发消息。这种方法的问题在于,如果队列永远不会完全清空,则具有最低优先级的消息可能会永远卡在队列中。我以为我可以在消息上使用 TTL,然后使用升级的优先级重新排队消息,但我在 docs:

中注意到了这一点

Messages which should expire will still only expire from the head of the queue. This means that unlike with normal queues, even per-queue TTL can lead to expired lower-priority messages getting stuck behind non-expired higher priority ones. These messages will never be delivered, but they will appear in queue statistics.

我担心我可能会用这种方法进入兔子洞。我想知道其他人是如何解决这个问题的。任何有关创意路由、消息传递模式或任何替代解决方案的反馈都将不胜感激。

一种解决方案是插入 Resequencer。该原则在 link 的诊断中进行了概述。在您的情况下,类似于:

  • 应用程序将其 DELETE 消息按原样发送到删除队列中。
  • Resequencer(你写的一个新组件)介于原始发布者和原始消费者之间。它:

    • 将消息从 DELETE 队列中拉入内存
    • 按用户将它们放入(内存中)队列
    • 将它们重新发布到新队列(例如 FairPriorityDeleteQueue),循环以公平地交错来自不同原始用户的任何消息
    • 限制其重新发布到 FairPriorityDeleteQueue 的速率,使得 FairPriorityDeleteQueue 的长度(可通过定期轮询 rabbitmq 管理 api 获得)永远不会超过您选择的某个整数 N,或者限制在与速率相关的某个速率-limited delete API 消费者使用。
    • 不会确认它从原始 DELETE 队列中取出的任何消息,直到将其重新发布到 FairPriorityDeleteQueue(因此您永远不会丢失消息)
  • 原始消费者改为订阅 FairPriorityDeleteQueue。

    • 您将这些消费者的 preFetchCount 设置得相当低 (<10),以防止它们依次在内存中批量缓冲 FairPriorityDeleteQueue 的内容。

--

注意事项:

  • 速率或长度限制发布到 and/or 从 FairPriorityDeleteQueue 中提取消息是必不可少的。如果您不限制,Resequencer 可能会在收到消息时尽可能快地传递消息,从而限制重新排序的可能性。
  • 重排序器当然在重排序时充当一种内存缓冲区。如果原始发布者可以突然将 非常 大量消息发布到队列中,您可能需要对 Resequencer 进程进行内存限制,以便它不会摄取超过其容量的信息。

您的特定场景得到了很大的帮助,因为您有一个外部因素(最终删除 API)限制了吞吐量。 如果没有这样的外部限制因素,为这种重新排序器选择最佳参数要困难得多,以在特定环境中平衡吞吐量与重新排序。

我认为在这种情况下不需要重新排序器。也许是,如果您需要确保按特定顺序删除项目。但这只有在您大致同时发送多条消息并且需要保证消费者端的顺序时才会发挥作用。

由于您提到的原因,您还应该避免超时情况。超时是为了告诉 RabbitMQ 一条消息不需要处理——或者它需要被路由到一个死信队列,以便我可以被其他代码处理。虽然你可以让超时工作,但我认为这不是一个好的选择。

优先级可能会解决部分问题,但可能会导致文件永远不会得到处理的情况。如果您有一条优先级为 1 的消息位于队列中的某处,并且您一直将优先级为 2、3、5、10 等的消息放入队列中,则 1 可能不会被处理。正如您所指出的,超时不能解决这个问题。

为了我的钱,我会建议一种不同的方法:针对单个文件连续发送删除请求。

即发送1条消息删除1个文件。等待回复说已经完成。然后发送下一条消息删除下一个文件。

这就是我认为这会奏效的原因,以及如何管理它:

长-运行 工作流,单个文件删除请求

在这种情况下,我建议使用 "saga"(又名 a long-running workflow object)的想法对问题采取多步骤方法。

当用户请求删除他们的垃圾桶时,您通过 rabbitmq 向可以处理删除过程的服务发送一条消息。该服务为该用户的垃圾桶创建传奇实例。

传奇收集了垃圾桶中需要删除的所有文件的列表。然后它开始发送删除单个文件的请求,一次一个。

对于删除单个文件的每个请求,saga 等待响应说文件已删除。

当 saga 收到消息说上一个文件已被删除时,它会发出下一个删除下一个文件的请求。

删除所有文件后,saga 会自行更新并更新系统的任何其他部分,告知垃圾桶为空。

处理多个用户

当您有一个用户请求删除时,对他们来说,事情会很快发生。他们很快就会清空垃圾。

u1 = User 1 Trashcan Delete Request

|u1|u1|u1|u1|u1|u1|u1|u1|u1|u1done|

当您有多个用户请求删除时,一次发送一个文件删除请求的过程意味着每个用户都有平等的机会获得下一个文件删除。

u1 = User 1 Trashcan Delete Request
u2 = User 2 Trashcan Delete Request

|u1|u2|u1|u1|u2|u2|u1|u2|u1|u2|u2|u1|u1|u1|u2|u2|u1|u2|u1|u1done|u2|u2done|

这样就可以共享资源删除文件了。总的来说,清空每个人的垃圾桶需要更长的时间,但他们会更快地看到进展,这是人们认为系统快速/响应他们的请求的一个重要方面。

优化小文件集与大文件集

在少数用户和少量文件的情况下,上述解决方案可能比一次性删除所有文件慢。毕竟,通过 rabbitmq 发送的消息会更多——每个需要删除的文件至少有 2 条消息(一个删除请求,一个删除确认响应)

要进一步优化它,您可以做几件事:

  1. 在像这样拆分工作之前要有最小垃圾桶大小。低于最低限度,您只需一次将其全部删除

  2. 将工作分成文件组,而不是一次一个。也许 10 或 100 个文件是一个更好的组大小,而不是一次 1 个文件

这些解决方案中的任何一个(或两者)都将通过减少发送的消息数量和稍微分批处理来帮助提高流程的整体性能。

您需要在您的真实场景中进行一些测试,以了解哪些(或可能两者)在哪些设置下会有所帮助。

许多用户问题

您可能会遇到另外一个问题 - 许多用户。如果您有 2 或 3 个用户请求删除,那不是什么大问题。

但是如果您有 100 或 1000 个用户请求删除,个人可能需要很长时间才能清空垃圾桶。

对于这种情况,您可能需要更高级别的控制流程,所有清空垃圾桶的请求都将由另一个 Saga 管理。这个传奇将限制活跃的垃圾桶删除传奇的数量。

例如,如果您有 10 个删除垃圾桶的活动请求,限速 saga 只会启动其中的 3 个,并且会等待一个完成,然后再启动下一个。

同样,出于性能原因,您需要测试您的实际场景以查看是否需要这样做并查看限制应该是多少。


在您的实际场景中可能还需要考虑其他场景,但我希望这能让您顺利完成! :)

所以我最终从网络路由器手册中取出了一页。这是路由器需要解决的问题,以允许公平的流量模式。 This video 对问题和解决方案进行了很好的分解。

将问题翻译成我的域:

以及解决方案:

负载平衡器是通道和已知数量队列的包装器,它使用加权算法在每个队列上接收的消息之间进行平衡。我们找到了一个 really interesting article/implementation 目前看来运行良好的

使用此解决方案,我还可以在消息发布后确定工作区的优先级以提高其吞吐量。这是一个非常好的功能。

摆在我面前的最大挑战是队列的管理。将有太多的队列在很长一段时间内无法绑定到交易所。我正在开发一些工具来管理它们的生命周期。