redis 队列中的并发
Concurrency within redis queue
我正在使用托管在 heroku 上的 django 应用程序和 redistogo addon:nano 包。我正在使用 rq,在后台执行任务 - 任务由在线用户发起。我对增加连接数量有限制,恐怕资源有限。
我目前有一个工人 运行 超过 'n' 个队列。每个队列使用连接池中的连接实例来处理 'n' 不同类型的任务。例如,如果有 4 个用户启动相同类型的任务,我想让我的主要工作人员动态创建子进程来处理它。有没有办法实现所需的多处理和并发?
我尝试使用 multiprocessing
模块,最初没有引入 Lock()
;但这会使用先前的请求数据公开并覆盖用户传递给启动函数的数据。应用锁后,它通过返回 server error - 500
来限制第二个用户发起请求
github link #1:看起来团队正在做PR;虽然还没有发布!
github link #2:这个post有助于解释在运行时创建更多的worker。
但是,此解决方案也会覆盖数据。新请求再次使用之前的请求数据进行处理。
如果您需要查看一些代码,请告诉我。我会尝试 post 一个最小的可重现片段。
任何 thoughts/suggestions/guidelines?
深入了解后,我意识到 Worker
class 已经在实施多处理。
work
函数在内部调用 execute_job(job, queue)
,后者又在模块
中引用
Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes within the given timeout bounds,
or will end the work horse with SIGALRM.
execute_job()
函数隐式地调用 fork_work_horse(job, queue)
,它产生一个工作马来执行实际工作,并按照以下逻辑将工作传递给它:
def fork_work_horse(self, job, queue):
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0:
self.main_work_horse(job, queue)
else:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
main_work_horse
对 perform_job(job, queue)
进行内部调用,这会进行一些其他调用以实际执行作业。
rq's official documentation page 中提到的有关 Worker Lifecycle 的所有步骤都在这些调用中得到处理。
这不是我所期待的多处理,但我想他们有办法做事。但是我原来的 post 仍然没有得到回答,而且我仍然不确定并发性..
那里的文档仍然需要处理,因为它很难涵盖这个库的真正本质!
你有机会尝试 AutoWorker 吗?
自动产生 RQ 工人。
from autoworker import AutoWorker
aw = AutoWorker(queue='high', max_procs=6)
aw.work()
它使用 multiprocessing
和 redis
模块中的 StrictRedis
并从 rq
导入
from rq.contrib.legacy import cleanup_ghosts
from rq.queue import Queue
from rq.worker import Worker, WorkerStatus
我正在使用托管在 heroku 上的 django 应用程序和 redistogo addon:nano 包。我正在使用 rq,在后台执行任务 - 任务由在线用户发起。我对增加连接数量有限制,恐怕资源有限。
我目前有一个工人 运行 超过 'n' 个队列。每个队列使用连接池中的连接实例来处理 'n' 不同类型的任务。例如,如果有 4 个用户启动相同类型的任务,我想让我的主要工作人员动态创建子进程来处理它。有没有办法实现所需的多处理和并发?
我尝试使用 multiprocessing
模块,最初没有引入 Lock()
;但这会使用先前的请求数据公开并覆盖用户传递给启动函数的数据。应用锁后,它通过返回 server error - 500
github link #1:看起来团队正在做PR;虽然还没有发布!
github link #2:这个post有助于解释在运行时创建更多的worker。 但是,此解决方案也会覆盖数据。新请求再次使用之前的请求数据进行处理。
如果您需要查看一些代码,请告诉我。我会尝试 post 一个最小的可重现片段。
任何 thoughts/suggestions/guidelines?
深入了解后,我意识到 Worker
class 已经在实施多处理。
work
函数在内部调用 execute_job(job, queue)
,后者又在模块
Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes within the given timeout bounds,
or will end the work horse with SIGALRM.
execute_job()
函数隐式地调用 fork_work_horse(job, queue)
,它产生一个工作马来执行实际工作,并按照以下逻辑将工作传递给它:
def fork_work_horse(self, job, queue):
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0:
self.main_work_horse(job, queue)
else:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
main_work_horse
对 perform_job(job, queue)
进行内部调用,这会进行一些其他调用以实际执行作业。
rq's official documentation page 中提到的有关 Worker Lifecycle 的所有步骤都在这些调用中得到处理。
这不是我所期待的多处理,但我想他们有办法做事。但是我原来的 post 仍然没有得到回答,而且我仍然不确定并发性..
那里的文档仍然需要处理,因为它很难涵盖这个库的真正本质!
你有机会尝试 AutoWorker 吗?
自动产生 RQ 工人。
from autoworker import AutoWorker
aw = AutoWorker(queue='high', max_procs=6)
aw.work()
它使用 multiprocessing
和 redis
模块中的 StrictRedis
并从 rq
from rq.contrib.legacy import cleanup_ghosts
from rq.queue import Queue
from rq.worker import Worker, WorkerStatus