气流 trigger_rule all_done 未按预期工作

Airflow trigger_rule all_done not working as expected

我在 Airflow 1.10.9 中有以下 DAG,其中 clean_folder 任务应该 运行 一旦所有先前的任务都成功、失败或被跳过。为了确保这一点,我将 clean_folder 运算符的 trigger_rule 参数设置为 "all_done":

t_clean_folders = BashOperator
            bash_command=f"python {os.path.join(custom_resources_path, 'cleaning.py')} {args['n_branches']}",
            trigger_rule='all_done',
            task_id="clean_folder",
        ) 

当跳过之前执行的分支中的所有任务时,此逻辑可以正常工作: [图表视图][1]

然而,当分支成功执行时,clean_folder 任务被跳过: [图表视图][2]

分支动态定义如下:

for b in range(args['n_branches']):
        t_file_sensing = FileSensor(
            filepath=f"{input_path}/input_{b}",
            task_id=f"file_sensing_{b}",
            poke_interval=60,
            timeout=60*60*5,
            soft_fail=True,
            retries=3,
        )
        t_data_staging = BashOperator(
            bash_command=f"python {os.path.join(custom_resources_path, 'staging.py')} {b}",
            task_id=f"data_staging_{b}",
        )
        ...

文档提供了“all_done”的以下定义:所有 parents 已完成执行。这是 trigger_rule 的正常行为吗?我可以更改什么以确保 clean_folder 在任何情况下(以及最后)都会 运行?谢谢! [1]: https://i.stack.imgur.com/AGqqD.png [2]: https://i.stack.imgur.com/7CG3r.png

如果可能,您应该考虑将 Airflow 版本至少升级到 1.10.15,以便从更新的 bug-fixes.

中受益

当每个父任务都被跳过时,clean_folderdag_complete 都会执行,这让我感到非常惊讶。跳过任务时的行为是直接跳过其子任务而不先检查其 trigger_rules.

根据Airflow 1.10.9 Documentation on trigger_rules

Skipped tasks will cascade through trigger rules all_success and all_failed but not all_done [...]

对于您的用例,您可以将工作流拆分为 2 个 DAG

  • 1 个 DAG 可以做你想做的一切,除了 t_clean_folder
  • 1 DAG 执行 t_clean_folder 任务,前面是 an ExternalTaskSensor