Airflow DAG 触发器 wait_for_completion 未按预期工作?
Airflow DAG trigger wait_for_completion not working as expected?
我有多个首先依赖于 initial_dag
运行ning 的 DAG,然后我希望依赖的 DAG 一个接一个地 运行。这是我拥有的:
dag = DAG(
dag_id=DAG_NAME,
default_args=default_args,
schedule_interval=None,
start_date=airflow.utils.dates.days_ago(1)
)
initial_dag = BashOperator(
task_id='initial_dag',
bash_command="python /home/airflow/gcs/dags/task.py",
dag=dag
)
dependent_dag1 = TriggerDagRunOperator(
task_id="dependent_dag1",
trigger_dag_id="dependent_dag1",
wait_for_completion=True,
dag=dag
)
dependent_dag2 = TriggerDagRunOperator(
task_id="dependent_dag2",
trigger_dag_id="dependent_dag2",
wait_for_completion=True,
dag=dag
)
dependent_dag3 = TriggerDagRunOperator(
task_id="dependent_dag3",
trigger_dag_id="dependent_dag3",
wait_for_completion=True,
dag=dag
)
initial_dag >> dependent_dag1 >> dependent_dag2 >> dependent_dag3
我认为 wait_for_completion=True
会在触发下一个 DAG 之前完成每个 DAG 的 运行。例如。 initial_dag
运行s 并完成,然后触发 dependent_dag1
并等待其完成以触发后续任务。
DAG 被触发的顺序是正确的,但它似乎并没有等待前一个 DAG 先完成,例如dependent_dag2
在 dependent_dag1
完成之前被触发。
我是不是漏掉了什么?
wait_for_completion
参数是完成任务而不是DAG本身。该任务在成功触发 DAG 时被标记为已完成,因此它不会特别等待该 DAG 的完成。
您至少有两个选择:
- 在触发器调用之间使用
ExternalTaskSensor
等待上一个DAG的最后一个任务。
- 在依赖 DAG 的末尾有一个
TriggerDagRunOperator
。例如dependent_dag1
的最后一个任务会是TriggerDagRunOperator
到运行dependent_dag2
等等。
您的选择将主要取决于更改选项 2 的 DAG 的可能性,以及您希望拥有的灵活性(认为如果您使用选项 1,则需要跟踪相关 DAG 的最后一个任务,但它仍然更灵活)。
This was answered on the Airflow GitHub discussion board but to bring both threads together here for other users.
不幸的是,wait_for_completion
参数在 1.10.x 版本(参见 documentation)中不可用,并且作为通用 kwarg
.此参数从 2.0 开始可用。
如果需要先等待上一个DAG完成,可以考虑使用ExternalTaskSensor代替TriggerDagRunOperator。
此运算符将等待另一个 DAG(或另一个 DAG 的任务)以指定状态(默认为“成功”)完成,直到继续前进。
我有多个首先依赖于 initial_dag
运行ning 的 DAG,然后我希望依赖的 DAG 一个接一个地 运行。这是我拥有的:
dag = DAG(
dag_id=DAG_NAME,
default_args=default_args,
schedule_interval=None,
start_date=airflow.utils.dates.days_ago(1)
)
initial_dag = BashOperator(
task_id='initial_dag',
bash_command="python /home/airflow/gcs/dags/task.py",
dag=dag
)
dependent_dag1 = TriggerDagRunOperator(
task_id="dependent_dag1",
trigger_dag_id="dependent_dag1",
wait_for_completion=True,
dag=dag
)
dependent_dag2 = TriggerDagRunOperator(
task_id="dependent_dag2",
trigger_dag_id="dependent_dag2",
wait_for_completion=True,
dag=dag
)
dependent_dag3 = TriggerDagRunOperator(
task_id="dependent_dag3",
trigger_dag_id="dependent_dag3",
wait_for_completion=True,
dag=dag
)
initial_dag >> dependent_dag1 >> dependent_dag2 >> dependent_dag3
我认为 wait_for_completion=True
会在触发下一个 DAG 之前完成每个 DAG 的 运行。例如。 initial_dag
运行s 并完成,然后触发 dependent_dag1
并等待其完成以触发后续任务。
DAG 被触发的顺序是正确的,但它似乎并没有等待前一个 DAG 先完成,例如dependent_dag2
在 dependent_dag1
完成之前被触发。
我是不是漏掉了什么?
wait_for_completion
参数是完成任务而不是DAG本身。该任务在成功触发 DAG 时被标记为已完成,因此它不会特别等待该 DAG 的完成。
您至少有两个选择:
- 在触发器调用之间使用
ExternalTaskSensor
等待上一个DAG的最后一个任务。 - 在依赖 DAG 的末尾有一个
TriggerDagRunOperator
。例如dependent_dag1
的最后一个任务会是TriggerDagRunOperator
到运行dependent_dag2
等等。
您的选择将主要取决于更改选项 2 的 DAG 的可能性,以及您希望拥有的灵活性(认为如果您使用选项 1,则需要跟踪相关 DAG 的最后一个任务,但它仍然更灵活)。
This was answered on the Airflow GitHub discussion board but to bring both threads together here for other users.
不幸的是,wait_for_completion
参数在 1.10.x 版本(参见 documentation)中不可用,并且作为通用 kwarg
.此参数从 2.0 开始可用。
如果需要先等待上一个DAG完成,可以考虑使用ExternalTaskSensor代替TriggerDagRunOperator。
此运算符将等待另一个 DAG(或另一个 DAG 的任务)以指定状态(默认为“成功”)完成,直到继续前进。