在任务 success/failure 上重新安排 DAG

Reschedule DAG on task success/failure

考虑一个非常简单的 Apache Airflow DAG:

FileSensor -> PythonOperator

其中 FileSensor 正在等待一些文件出现(poke_interval 相对较短)并且 PythonOperator 处理这些文件。此 DAG 被无限期地安排为 @once 至 运行 - 我如何将其设置为在成功(或失败)后从 PythonOperator 中再次重新安排为 运行?

总的来说,我认为 Elad 的建议可能会奏效,但我认为这是一种不好的做法。 DAG 的设计(和名称)是非循环的,因此在其中创建任何类型的循环都可能导致它出现意外行为。

同样根据 Airflow 文档,如果您计划使用外部 dag 触发器,您应该将 dag 计划设置为 None。就我个人而言,我不确定它是否一定会破坏某些东西,但它绝对可以为您提供意想不到的输出。如果出现问题,您稍后也可能需要更长的时间来调试它。

恕我直言,更好的方法是让您尝试并重新考虑您的设计。如果您需要在失败时重新安排 dag,您可以利用传感器 https://www.astronomer.io/guides/what-is-a-sensor 的重新安排模式。不确定为什么要在成功时重新运行它,如果是源中有多个文件的情况,我想说的是在 dag 脚本中创建具有可变参数和 for 循环的多个传感器。

就像 @Elad suggested, TriggerDagRunOperator 一样。结合 reset_dag_runexecution_date 参数,我能够解决这个问题。