气流:避免任务状态=跳过
Airflow: avoid task state = skipped
我想按照以下架构中的描述在 Airflow 中创建条件任务。预期情况如下:
- 任务 1:Start_cluster 执行
- 如果任务 1 成功,则任务 2 执行
- 如果任务 2 成功,则任务 3 执行
- 如果所有任务都成功或一个任务失败,则执行任务4:terminate_cluster
我试过了:
trigger_rule=TriggerRule.ONE_FAILED
任务 4 保持跳过状态,all_done也是
trigger_rule=TriggerRule.ALL_DONE
我找到了这个解决方案: 但它对我不起作用。
我认为无论前面的任务是否成功,您都希望终止集群,因此 ALL_DONE
听起来很合适。除了 Start_Cluster。如果失败,则可能没有要终止的集群,尽管您可能需要 check/try 以防万一。
默认的trigger_rule是ALL_SUCCESS
所以,例如,如果任务1失败,整个Dag都会失败,因为任务2需要任务1成功才能运行.
如果任何任务有可能失败,但您仍想终止集群,您将需要一些备用路径让 dag 遵循,例如使用 PythonBranchOperator
和 Python 回调函数。
另一种可能性是仅使用 运行 具有 "ONE_FAILURE" 的 trigger_rule 的虚拟运算符,然后 运行 终止集群任务。
例如,如果您将虚拟任务命名为 "Task_Failure" 这将是依赖链:
Start_Cluster >> Task_2 >> Task_3 >> Terminate_Cluster
Task_2 >> Task_Failure
Task_3 >> Task_Failure
Task_Failure >> Terminate_Cluster
在那种情况下,Task_Failure 可能必须将 Terminate_Cluster trigger_rule 设置为 ONE_SUCCESS
,因为有可能某些任务从未 run.If将最终任务设置为 ALL_DONE
并且之前的一些任务没有状态,它可能只是挂起或可能失败。
ALL_DONE和ALL_SUCCESS的区别:
我想按照以下架构中的描述在 Airflow 中创建条件任务。预期情况如下:
- 任务 1:Start_cluster 执行
- 如果任务 1 成功,则任务 2 执行
- 如果任务 2 成功,则任务 3 执行
- 如果所有任务都成功或一个任务失败,则执行任务4:terminate_cluster
我试过了:
trigger_rule=TriggerRule.ONE_FAILED
任务 4 保持跳过状态,all_done也是
trigger_rule=TriggerRule.ALL_DONE
我找到了这个解决方案:
我认为无论前面的任务是否成功,您都希望终止集群,因此 ALL_DONE
听起来很合适。除了 Start_Cluster。如果失败,则可能没有要终止的集群,尽管您可能需要 check/try 以防万一。
默认的trigger_rule是ALL_SUCCESS
所以,例如,如果任务1失败,整个Dag都会失败,因为任务2需要任务1成功才能运行.
如果任何任务有可能失败,但您仍想终止集群,您将需要一些备用路径让 dag 遵循,例如使用 PythonBranchOperator
和 Python 回调函数。
另一种可能性是仅使用 运行 具有 "ONE_FAILURE" 的 trigger_rule 的虚拟运算符,然后 运行 终止集群任务。
例如,如果您将虚拟任务命名为 "Task_Failure" 这将是依赖链:
Start_Cluster >> Task_2 >> Task_3 >> Terminate_Cluster
Task_2 >> Task_Failure
Task_3 >> Task_Failure
Task_Failure >> Terminate_Cluster
在那种情况下,Task_Failure 可能必须将 Terminate_Cluster trigger_rule 设置为 ONE_SUCCESS
,因为有可能某些任务从未 run.If将最终任务设置为 ALL_DONE
并且之前的一些任务没有状态,它可能只是挂起或可能失败。
ALL_DONE和ALL_SUCCESS的区别: