如何根据条件限制并发消息消耗

How to limit concurrent message consuming based on a criteria

场景(我已经简化了事情):

这个相当标准的模式运行良好。

问题:如果一个用户在同一分钟内启动了 10 个作业,而一天中的那个时间只有 10 个工作应用程序启动,则该最终用户实际上接管了所有工作他自己的计算时间。

问题:如何确保每个最终用户在任何时候只处理一个作业? (奖励:不得限制某些最终用户(例如管理员))

此外,我不希望前端应用程序阻止最终用户启动并发作业。我只希望最终用户等待他们的并发作业一次完成一个。

解决方案?:我应该为每个最终用户动态创建一个自动删除独占队列吗?如果是,我如何告诉工作应用程序开始使用这个队列?如何确保一个(且只有一个)工作人员将从该队列中消费?

rabbitMQ 本身没有提供这样的功能。 但是,您可以通过以下方式实现它。你将不得不使用轮询,这不是那么有效(与 subscribing/publishing 相比)。您还必须利用 Zookeeper 来协调不同的工作人员。

您将创建 2 个队列:1 个高优先级队列(用于管理作业)和 1 个低优先级队列(用于普通用户作业)。 10 个工作人员将从两个队列中检索消息。每个工作人员将执行一个无限循环(理想情况下,当队列为空时,有睡眠间隔),它将尝试从每个队列中交替检索消息:

  • 对于高优先级队列,worker 只是检索一条消息,处理它并向队列确认。
  • 对于低优先级队列,worker 尝试在 Zookeeper 中持有锁(通过写入特定的文件 znode),如果成功,则读取消息、处理它并确认。如果 zookeeper 写入不成功,其他人持有锁,所以这个 worker 跳过这一步并重复循环。

正如 Dimos 所说,您需要自己构建一些东西来实现它。这是一个替代实现,它需要一个额外的队列和一些持久存储。

  • 以及现有的作业队列,创建一个 "processable job queue"。只有满足您的业务规则的作业才会添加到此队列。
  • 为作业队列创建一个使用者(名为 "Limiter")。 Limiter 还需要持久存储(例如 Redis 或关系数据库)来记录当前正在处理的作业。限制器从作业队列中读取并写入可处理的作业队列。
  • 当工作应用程序完成处理作业时,它会向作业队列添加一个 "job finished" 事件。

    ------------     ------------     ----------- 
    | Producer | -> () job queue ) -> | Limiter | 
    ------------     ------------     ----------- 
                         ^                |                    
                         |                V                    
                         |     ------------------------       
                         |    () processable job queue )  
           job finished  |     ------------------------       
                         |                |
                         |                V
                         |     ------------------------
                         \-----| Job Processors (x10) |
                               ------------------------
    

limiter的逻辑如下:

  • 收到作业消息后,检查持久存储以查看是否已为当前用户 运行ning 作业:
    • 如果不是,则在存储中将作业记录为运行ning,并将作业消息添加到可处理作业队列中。
    • 如果现有作业正在 运行ning,则将存储中的作业记录为待处理作业。
    • 如果作业是针对管理员用户的,请始终将其添加到可处理作业队列中。
  • 收到 "job finished" 消息后,从持久存储的 "running jobs" 列表中删除该作业。然后检查该用户的待处理作业的存储:
    • 如果找到作业,将该作业的状态从待处理更改为 运行ning 并将其添加到可处理作业队列中。
    • 否则什么都不做。
  • 限制器进程一次只能 运行 一个实例。这可以通过仅启动限制器进程的单个实例或通过在持久存储中使用锁定机制来实现。

它相当重量级,但如果您需要查看发生了什么,您可以随时检查持久存储。