Airflow DAG 触发器 wait_for_completion 未按预期工作?

Airflow DAG trigger wait_for_completion not working as expected?

我有多个首先依赖于 initial_dag 运行ning 的 DAG,然后我希望依赖的 DAG 一个接一个地 运行。这是我拥有的:

dag = DAG(
    dag_id=DAG_NAME,
    default_args=default_args,
    schedule_interval=None,
    start_date=airflow.utils.dates.days_ago(1)
)

initial_dag = BashOperator(
    task_id='initial_dag',
    bash_command="python /home/airflow/gcs/dags/task.py",
    dag=dag
)

dependent_dag1 = TriggerDagRunOperator(
    task_id="dependent_dag1",
    trigger_dag_id="dependent_dag1",
    wait_for_completion=True,
    dag=dag
)

dependent_dag2 = TriggerDagRunOperator(
    task_id="dependent_dag2",
    trigger_dag_id="dependent_dag2",
    wait_for_completion=True,
    dag=dag
)

dependent_dag3 = TriggerDagRunOperator(
    task_id="dependent_dag3",
    trigger_dag_id="dependent_dag3",
    wait_for_completion=True,
    dag=dag
)

initial_dag >> dependent_dag1 >> dependent_dag2 >> dependent_dag3

我认为 wait_for_completion=True 会在触发下一个 DAG 之前完成每个 DAG 的 运行。例如。 initial_dag 运行s 并完成,然后触发 dependent_dag1 并等待其完成以触发后续任务。

DAG 被触发的顺序是正确的,但它似乎并没有等待前一个 DAG 先完成,例如dependent_dag2dependent_dag1 完成之前被触发。

我是不是漏掉了什么?

wait_for_completion参数是完成任务而不是DAG本身。该任务在成功触发 DAG 时被标记为已完成,因此它不会特别等待该 DAG 的完成。

您至少有两个选择:

  1. 在触发器调用之间使用ExternalTaskSensor等待上一个DAG的最后一个任务。
  2. 在依赖 DAG 的末尾有一个 TriggerDagRunOperator。例如dependent_dag1的最后一个任务会是TriggerDagRunOperator到运行dependent_dag2等等。

您的选择将主要取决于更改选项 2 的 DAG 的可能性,以及您希望拥有的灵活性(认为如果您使用选项 1,则需要跟踪相关 DAG 的最后一个任务,但它仍然更灵活)。

This was answered on the Airflow GitHub discussion board but to bring both threads together here for other users.

不幸的是,wait_for_completion 参数在 1.10.x 版本(参见 documentation)中不可用,并且作为通用 kwarg.此参数从 2.0 开始可用。

如果需要先等待上一个DAG完成,可以考虑使用ExternalTask​​Sensor代替TriggerDagRunOperator。

在此处查看文档:https://airflow.apache.org/docs/apache-airflow/1.10.3/_api/airflow/sensors/external_task_sensor/index.html

此运算符将等待另一个 DAG(或另一个 DAG 的任务)以指定状态(默认为“成功”)完成,直到继续前进。