使用链和组在芹菜中设计工作流程
Design workflow in celery using chain and group
我是 celery 的新手,我正在尝试使用 Chain、Groups 和 Chord 在 celery 中设计工作流程。这是我到目前为止所做的:
def __chainfileprocessing(config):
filelist, src = get_files_for_processing(config)
for fileattr in filelist:
chain( download_file.s(fileattr, src)
,importdata.s(fileattr, src)
,post_processing.s(fileattr, src)
).apply_async()
当前执行顺序:
- 正在执行所有 download_file() 任务
- 然后所有 import_data() 任务都被执行
- 然后所有 post_processing() 任务都被执行
我需要的:
对于文件列表中的每个项目,任务应按 download_file() => import_data() => post_processing()
顺序执行。
您的代码正在执行您所说的操作。 chain
将依次为 download_file
、import_date
、post_processing
启动文件列表中的每个项目的任务。
您的代码所做的是:
- 为文件 A 异步启动一系列任务(
download_file
,然后 importdata
,然后 post_processing
);这将启动文件 A 的 download_file
。完成后,它将启动文件 A 的 importdata
任务。apply_async
returns 立即;它不会等待任何任务完成。
- 为文件 B 异步启动一系列任务(
download_file
,然后 importdata
,然后 post_processing
);这会为文件 B 启动 download_file
。完成后,它将启动文件 B 的 importdata
任务。文件 A 的 download_file
任务可能是 运行,但是此调用不知道;它只是将任务添加到队列中。
- 等等
在循环结束时,您发送给 celery 的是 download_file
文件 A...n。当每个 download_file
任务完成时,它将提交其链中的下一个任务。
不同文件的任务之间没有依赖关系;从您发布的代码来看,它看起来并不需要(为什么文件 2 必须等待文件 1 完成?)
我是 celery 的新手,我正在尝试使用 Chain、Groups 和 Chord 在 celery 中设计工作流程。这是我到目前为止所做的:
def __chainfileprocessing(config):
filelist, src = get_files_for_processing(config)
for fileattr in filelist:
chain( download_file.s(fileattr, src)
,importdata.s(fileattr, src)
,post_processing.s(fileattr, src)
).apply_async()
当前执行顺序:
- 正在执行所有 download_file() 任务
- 然后所有 import_data() 任务都被执行
- 然后所有 post_processing() 任务都被执行
我需要的:
对于文件列表中的每个项目,任务应按 download_file() => import_data() => post_processing()
顺序执行。
您的代码正在执行您所说的操作。 chain
将依次为 download_file
、import_date
、post_processing
启动文件列表中的每个项目的任务。
您的代码所做的是:
- 为文件 A 异步启动一系列任务(
download_file
,然后importdata
,然后post_processing
);这将启动文件 A 的download_file
。完成后,它将启动文件 A 的importdata
任务。apply_async
returns 立即;它不会等待任何任务完成。 - 为文件 B 异步启动一系列任务(
download_file
,然后importdata
,然后post_processing
);这会为文件 B 启动download_file
。完成后,它将启动文件 B 的importdata
任务。文件 A 的download_file
任务可能是 运行,但是此调用不知道;它只是将任务添加到队列中。 - 等等
在循环结束时,您发送给 celery 的是 download_file
文件 A...n。当每个 download_file
任务完成时,它将提交其链中的下一个任务。
不同文件的任务之间没有依赖关系;从您发布的代码来看,它看起来并不需要(为什么文件 2 必须等待文件 1 完成?)