具有动态定义和弦的芹菜链
Celery chain with dynamically-defined chord
我正在尝试从签名列表生成和弦。
当链开始执行时,这个列表的长度是未知的。
例如。 task_c
的输出是一个长度为 n
的列表,该列表应该生成一个相同长度的要并行执行的 celery 签名列表。我只能通过部分签名访问上一个任务的结果.s()
,那么如何在我的链中动态定义这组签名?
例如。我该怎么做:
@task
def task_d(kwargs):
return [task_e.si(i) for i in random.sample(range(10, 30), 5)]
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
chord(
[
task_e.si(**kwargs),
task_e.si(**kwargs),
task_e.si(**kwargs),
],
my_callback.si(**kwargs),
),
),
...对此
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
chord(
[task_d.s(**kwargs)], <<<<<< ?
my_callback.si(**kwargs),
),
),
我认为你必须这样做:
@task
def task_d(list_of_length_n):
tasks = [task_e.si(i) for i in list_of_length_n]
chord(tasks, my_callback.si(**kwargs))()
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
task_d.s(),
)
我正在尝试从签名列表生成和弦。
当链开始执行时,这个列表的长度是未知的。
例如。 task_c
的输出是一个长度为 n
的列表,该列表应该生成一个相同长度的要并行执行的 celery 签名列表。我只能通过部分签名访问上一个任务的结果.s()
,那么如何在我的链中动态定义这组签名?
例如。我该怎么做:
@task
def task_d(kwargs):
return [task_e.si(i) for i in random.sample(range(10, 30), 5)]
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
chord(
[
task_e.si(**kwargs),
task_e.si(**kwargs),
task_e.si(**kwargs),
],
my_callback.si(**kwargs),
),
),
...对此
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
chord(
[task_d.s(**kwargs)], <<<<<< ?
my_callback.si(**kwargs),
),
),
我认为你必须这样做:
@task
def task_d(list_of_length_n):
tasks = [task_e.si(i) for i in list_of_length_n]
chord(tasks, my_callback.si(**kwargs))()
ctask = chain(
task_a.si(**kwargs),
task_b.si(**kwargs),
task_c.si(**kwargs),
task_d.s(),
)