芹菜将并行任务链接到和弦中

Celery chaining parallel tasks into chord

我正在努力思考这个异步任务处理设置。到目前为止,我一直在考虑使用 Celery,但还没有锁定任何东西。唯一的要求是我可以使用 Redis 作为代理并在多个节点上分发任务。

       ->  Task2  ->  Task3
Task1  ->  Task2  ->  Task3    [then]    Task4
       ->  Task2  ->  Task3

解释:

那么问题是,我如何用 Celery 做到这一点?

可以用和弦函数和链函数来完成,请看例子。它应该适合您的需要。

from celery import Celery, chord, chain

backend = 'redis://redis:6379/'
app = Celery(result_backend=backend, backend=backend)


@app.task
def task1():
    argument = 123
    return chord([
        chain(task2.s(argument), task3.s()),
        chain(task2.s(argument), task3.s()),
        chain(task2.s(argument), task3.s()),
    ])(task4.s())


@app.task
def task2(argument):
    pass


@app.task
def task3(result_task2):
    pass


@app.task
def task4(result):
    pass


task1.apply_async()