具有多个连接的 Rq Worker

Rq Worker with multiple connections

我在同一个网络中有 3 个服务器。在这些服务器中的每一个上,一个 redis 服务和某种生产者是 运行。生产者将作业排入名为 tasks 的本地 rq 队列。 所以每个服务器都有自己的 tasks 队列。

此外,还有一台服务器是 运行 rq worker。是否可以让该工作人员检查 3 台服务器中每台服务器上的 tasks 队列?

我已经尝试创建连接列表

import redis
from rq import Queue, Worker
from rq import push_connection
# urls = [url1, url2, url3]
connections = list(map(redis.from_url, urls))

然后我用它来创建队列列表。

queues = list(map(lambda c: Queue('tasks', connection=c), connections))

然后我推送所有连接

for connection in connections:
    push_connection(connection)

并将队列传递给 Worker

Worker(queues=queues).work()

这导致工作人员只监听 tasks 最后推送的任何连接。

我一直在研究 rq 上的代码,我想我可以编写一个自定义工作程序 class 来执行此操作,但在我这样做之前,我想问一下是否还有其他方法。甚至可能完全是另一个队列框架?

好的,我解决了这个问题。我仍然不确定我是否有权在此处使用 post 实际源代码,因此我将概述我的解决方案。

我不得不覆盖 register_birth(self)register_death(self)dequeue_job_and_maintain_ttl(self, timeout)。这些函数的原始实现可以在 here.

中找到

register_birth

基本上,您必须遍历所有连接,push_connection(connection),完成注册过程,然后pop_connection()

注意只在 mapping 变量中列出与该连接对应的队列。最初的实现使用 queue_names(self) 来获取队列名称列表。您必须执行与 queue_names(self) 相同的操作,但仅限于相关队列。

register_death

register_birth基本相同。遍历所有连接,push_connection(connection),完成与原实现相同的步骤,pop_connection().

dequeue_job_and_maintain_ttl

我们来看看original implementation of this function. We'll want to keep everything the same until we get to the try block. Here we want to iterate over all connections endlessly. You can do this by using itertools.cycle.

在循环 push_connection(connection) 内,将 self.connection 设置为当前连接。如果缺少 self.connection = connection,作业的结果可能无法正确返回。

现在我们将继续调用 self.queue_class.dequeue_any 类似于原始实现。但是我们会将超时设置为 1,这样我们就可以继续检查另一个连接,如果当前的连接没有任何工人的工作。

确保使用与当前连接对应的队列列表调用 self.queue_class.dequeue_any。在这种情况下 queues 仅包含相关队列。

result = self.queue_class.dequeue_any(
    queues, 1, connection=connection, job_class=self.job_class)

之后 pop_connection(),并在 result 上执行与原始实施相同的检查。如果 result 不是 None,我们找到了工作要做,需要 break 跳出循环。

保留原始实现中的所有其他内容。不要忘记 try 块末尾的 break。它跳出了 while True 循环。

另一件事

队列包含对其连接的引用。您可以使用它来创建 (connection, queues) 的列表,其中 queues 包含具有连接 connection.

的所有队列

如果将结果列表传递给 itertools.cycle,您将获得覆盖 dequeue_job_and_maintain_ttl.

所需的无限迭代器