具有动态定义和弦的芹菜链

Celery chain with dynamically-defined chord

我正在尝试从签名列表生成和弦。 当链开始执行时,这个列表的长度是未知的。 例如。 task_c 的输出是一个长度为 n 的列表,该列表应该生成一个相同长度的要并行执行的 celery 签名列表。我只能通过部分签名访问上一个任务的结果.s(),那么如何在我的链中动态定义这组签名?

例如。我该怎么做:

@task
def task_d(kwargs):
    return [task_e.si(i) for i in random.sample(range(10, 30), 5)]

ctask = chain(
    task_a.si(**kwargs),
    task_b.si(**kwargs),
    task_c.si(**kwargs),
    chord(
            [
                task_e.si(**kwargs),
                task_e.si(**kwargs),
                task_e.si(**kwargs),
            ],
            my_callback.si(**kwargs),
        ),
    ),

...对此

ctask = chain(
    task_a.si(**kwargs),
    task_b.si(**kwargs),
    task_c.si(**kwargs),
    chord(
            [task_d.s(**kwargs)],     <<<<<< ?
            my_callback.si(**kwargs),
        ),
    ),

我认为你必须这样做:

@task
def task_d(list_of_length_n):
    tasks = [task_e.si(i) for i in list_of_length_n]
    chord(tasks, my_callback.si(**kwargs))()

ctask = chain(
    task_a.si(**kwargs),
    task_b.si(**kwargs),
    task_c.si(**kwargs),
    task_d.s(),
)