python rq worker 并行执行任务

python rq worker execute tasks in parallel

我不太了解 python rq,我才刚刚开始学习它。

有一个 task_a 需要 3 分钟才能完成处理。

@job
def task_a():
    time.sleep(180)
    print('done processing task_a')

def call_3_times():
    task_a.delay()
    task_a.delay()
    task_a.delay()

据我观察,task_a会从队列中一个一个执行。第一次通话结束后,再进行下一次通话,依此类推。总用时为 3 分钟 x 3 = 9 分钟

如何使call_3_times函数中的每个task_a并行执行?所以花费的时间不到 9 分钟,可能是 3 分 10 秒(只是一个例子,它可能会比这更快)。

可能我需要生成 3 个 rq worker 是的,它确实工作得更快并且像并行一样。但是,如果我需要调用它 2000 次怎么办。我应该产生 2000 个 rq 工人吗?我的意思是,一定有更好的方法来做到这一点。

如果使用rq,答案是肯定的,需要跨越更多的worker来进行并发。

这来自 rq 网站: http://python-rq.org/docs/workers/

每个工人一次处理一个作业。在一个 worker 中,没有并发处理正在进行。如果你想同时执行工作,只需启动更多的工人。


如果想找到解决办法,试试芹菜:http://docs.celeryproject.org

然后你可以这样做:

celery worker --concurrency=10

它提供工人级别的并发,所以你不需要 spwn 20000 工人什么的。

如果你需要调用任务2000次,你可以在队列中创建2000个作业,并且只有3个worker一次并行工作3个,直到所有作业完成。

工人的数量取决于您的服务器的规格。启动 2000 个 worker 以试图同时并行所有作业显然是不切实际的。如果您真的需要一次处理数千个作业,您有两个选择:

  1. 在工人农场(多台服务器)上分配工作
  2. 在每个工作函数中添加并发性,以便每个工作函数生成新线程或进程来完成实际工作。

选项 #2 取决于您从事的工作类型(I/O 或 CPU 绑定)。如果它是 IO 绑定和线程安全的,则在工作函数中使用线程,否则,使用多处理并增加资源依赖性的权衡。但是,如果您有资源来产生多个进程,为什么不首先增加复杂性较低的工作人员数量。

总而言之,根据您的任务类型。如果它是 I/O 绑定的,您可以执行#1/#2。如果它是 CPU 绑定的,您的选择仅限于关于您的服务器规格的#1。