气流 trigger_rule all_done 未按预期工作
Airflow trigger_rule all_done not working as expected
我在 Airflow 1.10.9 中有以下 DAG,其中 clean_folder 任务应该 运行 一旦所有先前的任务都成功、失败或被跳过。为了确保这一点,我将 clean_folder 运算符的 trigger_rule 参数设置为 "all_done":
t_clean_folders = BashOperator
bash_command=f"python {os.path.join(custom_resources_path, 'cleaning.py')} {args['n_branches']}",
trigger_rule='all_done',
task_id="clean_folder",
)
当跳过之前执行的分支中的所有任务时,此逻辑可以正常工作:
[图表视图][1]
然而,当分支成功执行时,clean_folder 任务被跳过:
[图表视图][2]
分支动态定义如下:
for b in range(args['n_branches']):
t_file_sensing = FileSensor(
filepath=f"{input_path}/input_{b}",
task_id=f"file_sensing_{b}",
poke_interval=60,
timeout=60*60*5,
soft_fail=True,
retries=3,
)
t_data_staging = BashOperator(
bash_command=f"python {os.path.join(custom_resources_path, 'staging.py')} {b}",
task_id=f"data_staging_{b}",
)
...
文档提供了“all_done”的以下定义:所有 parents 已完成执行。这是 trigger_rule 的正常行为吗?我可以更改什么以确保 clean_folder 在任何情况下(以及最后)都会 运行?谢谢!
[1]: https://i.stack.imgur.com/AGqqD.png
[2]: https://i.stack.imgur.com/7CG3r.png
如果可能,您应该考虑将 Airflow 版本至少升级到 1.10.15,以便从更新的 bug-fixes.
中受益
当每个父任务都被跳过时,clean_folder
和 dag_complete
都会执行,这让我感到非常惊讶。跳过任务时的行为是直接跳过其子任务而不先检查其 trigger_rules.
根据Airflow 1.10.9 Documentation on trigger_rules,
Skipped tasks will cascade through trigger rules all_success and all_failed but not all_done [...]
对于您的用例,您可以将工作流拆分为 2 个 DAG:
- 1 个 DAG 可以做你想做的一切,除了
t_clean_folder
- 1 DAG 执行
t_clean_folder
任务,前面是 an ExternalTaskSensor
我在 Airflow 1.10.9 中有以下 DAG,其中 clean_folder 任务应该 运行 一旦所有先前的任务都成功、失败或被跳过。为了确保这一点,我将 clean_folder 运算符的 trigger_rule 参数设置为 "all_done":
t_clean_folders = BashOperator
bash_command=f"python {os.path.join(custom_resources_path, 'cleaning.py')} {args['n_branches']}",
trigger_rule='all_done',
task_id="clean_folder",
)
当跳过之前执行的分支中的所有任务时,此逻辑可以正常工作: [图表视图][1]
然而,当分支成功执行时,clean_folder 任务被跳过: [图表视图][2]
分支动态定义如下:
for b in range(args['n_branches']):
t_file_sensing = FileSensor(
filepath=f"{input_path}/input_{b}",
task_id=f"file_sensing_{b}",
poke_interval=60,
timeout=60*60*5,
soft_fail=True,
retries=3,
)
t_data_staging = BashOperator(
bash_command=f"python {os.path.join(custom_resources_path, 'staging.py')} {b}",
task_id=f"data_staging_{b}",
)
...
文档提供了“all_done”的以下定义:所有 parents 已完成执行。这是 trigger_rule 的正常行为吗?我可以更改什么以确保 clean_folder 在任何情况下(以及最后)都会 运行?谢谢! [1]: https://i.stack.imgur.com/AGqqD.png [2]: https://i.stack.imgur.com/7CG3r.png
如果可能,您应该考虑将 Airflow 版本至少升级到 1.10.15,以便从更新的 bug-fixes.
中受益当每个父任务都被跳过时,clean_folder
和 dag_complete
都会执行,这让我感到非常惊讶。跳过任务时的行为是直接跳过其子任务而不先检查其 trigger_rules.
根据Airflow 1.10.9 Documentation on trigger_rules,
Skipped tasks will cascade through trigger rules all_success and all_failed but not all_done [...]
对于您的用例,您可以将工作流拆分为 2 个 DAG:
- 1 个 DAG 可以做你想做的一切,除了
t_clean_folder
- 1 DAG 执行
t_clean_folder
任务,前面是 an ExternalTaskSensor