如何在 Airflow 中设置 DAG 之间的依赖关系?

How to set dependencies between DAGs in Airflow?

我正在使用 Airflow 来安排批处理作业。我有一个 DAG (A) 每晚 运行s 和另一个 DAG (B) 运行s 每个月一次。 B 取决于 A 是否已成功完成。但是 B 需要很长时间才能 运行 因此我想将它保存在一个单独的 DAG 中以允许更好的 SLA 报告。

如何让 运行ning DAG B 在同一天依赖成功的 运行 DAG A?

您可以使用名为 ExternalTask​​Sensor 的运算符实现此行为。 您在 DAG(B) 中的任务 (B1) 将被安排并等待 DAG(A) 中的任务 (A2) 成功

External Task Sensor documentation

看起来像TriggerDagRunOperator can be used as well, and you can use a python callable to add some logic. As explained here : https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

当需要跨DAG依赖时,往往有两个需求:

  1. DAG B 上的任务 B1 需要在 DAG A 上的任务 A1 完成后 运行。正如其他人提到的那样,这可以使用 ExternalTaskSensor 来实现:

    B1 = ExternalTaskSensor(task_id="B1",
                            external_dag_id='A',
                            external_task_id='A1',
                            mode="reschedule")
    
  2. 当用户清除 DAG A 上的任务 A1 时,我们希望 Airflow 清除 DAG B 上的任务 B1 以让它重新运行。这可以使用 ExternalTaskMarker 来实现(自 Airflow v1.10.8 起)。

    A1 = ExternalTaskMarker(task_id="A1", 
                            external_dag_id="B",
                            external_task_id="B1")
    

请参阅有关跨 DAG 依赖项的文档以获取更多详细信息:https://airflow.apache.org/docs/stable/howto/operator/external.html