气流:避免任务状态=跳过

Airflow: avoid task state = skipped

我想按照以下架构中的描述在 Airflow 中创建条件任务。预期情况如下:

我试过了:

trigger_rule=TriggerRule.ONE_FAILED

任务 4 保持跳过状态,all_done也是

trigger_rule=TriggerRule.ALL_DONE

我找到了这个解决方案: 但它对我不起作用。

我认为无论前面的任务是否成功,您都希望终止集群,因此 ALL_DONE 听起来很合适。除了 Start_Cluster。如果失败,则可能没有要终止的集群,尽管您可能需要 check/try 以防万一。

默认的trigger_rule是ALL_SUCCESS所以,例如,如果任务1失败,整个Dag都会失败,因为任务2需要任务1成功才能运行.

如果任何任务有可能失败,但您仍想终止集群,您将需要一些备用路径让 dag 遵循,例如使用 PythonBranchOperator 和 Python 回调函数。

另一种可能性是仅使用 运行 具有 "ONE_FAILURE" 的 trigger_rule 的虚拟运算符,然后 运行 终止集群任务。

例如,如果您将虚拟任务命名为 "Task_Failure" 这将是依赖链:

Start_Cluster >> Task_2 >> Task_3 >> Terminate_Cluster
Task_2 >> Task_Failure
Task_3 >> Task_Failure
Task_Failure >> Terminate_Cluster

在那种情况下,Task_Failure 可能必须将 Terminate_Cluster trigger_rule 设置为 ONE_SUCCESS,因为有可能某些任务从未 run.If将最终任务设置为 ALL_DONE 并且之前的一些任务没有状态,它可能只是挂起或可能失败。

ALL_DONE和ALL_SUCCESS的区别: