redis 队列中的并发

Concurrency within redis queue

我正在使用托管在 heroku 上的 django 应用程序和 redistogo addon:nano 包。我正在使用 rq,在后台执行任务 - 任务由在线用户发起。我对增加连接数量有限制,恐怕资源有限。

我目前有一个工人 运行 超过 'n' 个队列。每个队列使用连接池中的连接实例来处理 'n' 不同类型的任务。例如,如果有 4 个用户启动相同类型的任务,我想让我的主要工作人员动态创建子进程来处理它。有没有办法实现所需的多处理和并发?

我尝试使用 multiprocessing 模块,最初没有引入 Lock();但这会使用先前的请求数据公开并覆盖用户传递给启动函数的数据。应用锁后,它通过返回 server error - 500

来限制第二个用户发起请求

github link #1:看起来团队正在做PR;虽然还没有发布!

github link #2:这个post有助于解释在运行时创建更多的worker。 但是,此解决方案也会覆盖数据。新请求再次使用之前的请求数据进行处理。

如果您需要查看一些代码,请告诉我。我会尝试 post 一个最小的可重现片段。

任何 thoughts/suggestions/guidelines?

深入了解后,我意识到 Worker class 已经在实施多处理。

work 函数在内部调用 execute_job(job, queue),后者又在模块

中引用

Spawns a work horse to perform the actual work and passes it a job.

The worker will wait for the work horse and make sure it executes within the given timeout bounds,

or will end the work horse with SIGALRM.

execute_job() 函数隐式地调用 fork_work_horse(job, queue),它产生一个工作马来执行实际工作,并按照以下逻辑将工作传递给它:


def fork_work_horse(self, job, queue):

        child_pid = os.fork()
        os.environ['RQ_WORKER_ID'] = self.name
        os.environ['RQ_JOB_ID'] = job.id
        if child_pid == 0:
            self.main_work_horse(job, queue)
        else:
            self._horse_pid = child_pid
            self.procline('Forked {0} at {1}'.format(child_pid, time.time()))


main_work_horseperform_job(job, queue) 进行内部调用,这会进行一些其他调用以实际执行作业。

rq's official documentation page 中提到的有关 Worker Lifecycle 的所有步骤都在这些调用中得到处理。

这不是我所期待的多处理,但我想他们有办法做事。但是我原来的 post 仍然没有得到回答,而且我仍然不确定并发性..

那里的文档仍然需要处理,因为它很难涵盖这个库的真正本质!

你有机会尝试 AutoWorker 吗?

自动产生 RQ 工人。

from autoworker import AutoWorker
aw = AutoWorker(queue='high', max_procs=6)
aw.work()

它使用 multiprocessingredis 模块中的 StrictRedis 并从 rq

导入
from rq.contrib.legacy import cleanup_ghosts
from rq.queue import Queue
from rq.worker import Worker, WorkerStatus