使用链和组在芹菜中设计工作流程

Design workflow in celery using chain and group

我是 celery 的新手,我正在尝试使用 Chain、Groups 和 Chord 在 celery 中设计工作流程。这是我到目前为止所做的:

def __chainfileprocessing(config):
    filelist, src = get_files_for_processing(config)
    for fileattr in filelist:
        chain(  download_file.s(fileattr, src)
                ,importdata.s(fileattr, src)
                ,post_processing.s(fileattr, src)
             ).apply_async()

当前执行顺序:

我需要的:

对于文件列表中的每个项目,任务应按 download_file() => import_data() => post_processing() 顺序执行。

您的代码正在执行您所说的操作。 chain 将依次为 download_fileimport_datepost_processing 启动文件列表中的每个项目的任务。

您的代码所做的是:

  • 为文件 A 异步启动一系列任务(download_file,然后 importdata,然后 post_processing);这将启动文件 A 的 download_file。完成后,它将启动文件 A 的 importdata 任务。apply_async returns 立即;它不会等待任何任务完成。
  • 为文件 B 异步启动一系列任务(download_file,然后 importdata,然后 post_processing);这会为文件 B 启动 download_file。完成后,它将启动文件 B 的 importdata 任务。文件 A 的 download_file 任务可能是 运行,但是此调用不知道;它只是将任务添加到队列中。
  • 等等

在循环结束时,您发送给 celery 的是 download_file 文件 A...n。当每个 download_file 任务完成时,它将提交其链中的下一个任务。

不同文件的任务之间没有依赖关系;从您发布的代码来看,它看起来并不需要(为什么文件 2 必须等待文件 1 完成?)