芹菜将并行任务链接到和弦中
Celery chaining parallel tasks into chord
我正在努力思考这个异步任务处理设置。到目前为止,我一直在考虑使用 Celery,但还没有锁定任何东西。唯一的要求是我可以使用 Redis 作为代理并在多个节点上分发任务。
-> Task2 -> Task3
Task1 -> Task2 -> Task3 [then] Task4
-> Task2 -> Task3
解释:
- Task1 生成项目列表
- Task2 从 Task1 接收一个项目作为参数
- Task2 和 Task3 是链式的,每个链都是并行执行的
- 当所有 Task2-Task3 链完成时执行 Task4(不需要从 Task3 传递任何数据)
那么问题是,我如何用 Celery 做到这一点?
可以用和弦函数和链函数来完成,请看例子。它应该适合您的需要。
from celery import Celery, chord, chain
backend = 'redis://redis:6379/'
app = Celery(result_backend=backend, backend=backend)
@app.task
def task1():
argument = 123
return chord([
chain(task2.s(argument), task3.s()),
chain(task2.s(argument), task3.s()),
chain(task2.s(argument), task3.s()),
])(task4.s())
@app.task
def task2(argument):
pass
@app.task
def task3(result_task2):
pass
@app.task
def task4(result):
pass
task1.apply_async()
我正在努力思考这个异步任务处理设置。到目前为止,我一直在考虑使用 Celery,但还没有锁定任何东西。唯一的要求是我可以使用 Redis 作为代理并在多个节点上分发任务。
-> Task2 -> Task3
Task1 -> Task2 -> Task3 [then] Task4
-> Task2 -> Task3
解释:
- Task1 生成项目列表
- Task2 从 Task1 接收一个项目作为参数
- Task2 和 Task3 是链式的,每个链都是并行执行的
- 当所有 Task2-Task3 链完成时执行 Task4(不需要从 Task3 传递任何数据)
那么问题是,我如何用 Celery 做到这一点?
可以用和弦函数和链函数来完成,请看例子。它应该适合您的需要。
from celery import Celery, chord, chain
backend = 'redis://redis:6379/'
app = Celery(result_backend=backend, backend=backend)
@app.task
def task1():
argument = 123
return chord([
chain(task2.s(argument), task3.s()),
chain(task2.s(argument), task3.s()),
chain(task2.s(argument), task3.s()),
])(task4.s())
@app.task
def task2(argument):
pass
@app.task
def task3(result_task2):
pass
@app.task
def task4(result):
pass
task1.apply_async()