气流 2.2 TriggerDagRunOperator wait_for_completion 行为
Airflow 2.2 TriggerDagRunOperator wait_for_completion behavior
我对 TriggerDagRunOperator
有疑问,特别是 wait_for_completion 参数。
在迁移到 Airflow 2.2 之前,我们使用此运算符触发另一个 DAG 和一个 ExternalTaskSensor
以等待其完成。
在 Airflow 2.2 中,有一个名为 wait_for_completion
的新参数,如果设置为 True,则只有在触发的 DAG 完成时才会使任务完成。
这很好,但我想知道工人是否会在戳之间被释放。我知道 ExternalTaskSensor
曾经有一个参数 reschedule
可以用于大于 1m 的 poke,这将释放 poke 之间的工作槽 - 但我在文档中再也看不到它了。
我的问题是 wait_for_completion
参数是否会导致操作员在戳之间释放工人?看代码觉得不是这样,所以想验证一下。
如果不释放 worker 并且触发的 DAG 必然需要超过 1m 才能完成,这里最好的方法应该是什么?
我们使用的是 MWAA Airflow 2.2,所以我猜延迟运算符不是一个选项(如果在这种情况下它是一个解决方案)
在TriggerDagRunOperator
中使用wait_for_completion=True
时,只要操作员是运行,工人就不会被释放。您可以在 operator implementation 中看到它。运算符使用 time.sleep(self.poke_interval)
正如您所指出的,有两种方法可以实现验证触发的 dag 已完成的目标:
- DAG A 使用
TriggerDagRunOperator
后跟 ExternalTaskSensor
- 将
TriggerDagRunOperator
与 wait_for_completion=True
结合使用
但是除了您提到的资源问题之外,这两个选项并不完全等同。
在选项 1 中,如果触发的 DAG 失败,则 ExternalTaskSensor
将失败。
在选项 2 中考虑:
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
my_op = TriggerDagRunOperator (
task_id='task',
trigger_dag_id="dag_b",
...,
wait_for_completion=True,
retries=2
)
如果 dag_b
失败,则 TriggerDagRunOperator
将重试,这将调用 dag_b
的另一个 DagRun。
两个选项都有效。您需要决定哪种行为适合您的用例。
我对 TriggerDagRunOperator
有疑问,特别是 wait_for_completion 参数。
在迁移到 Airflow 2.2 之前,我们使用此运算符触发另一个 DAG 和一个 ExternalTaskSensor
以等待其完成。
在 Airflow 2.2 中,有一个名为 wait_for_completion
的新参数,如果设置为 True,则只有在触发的 DAG 完成时才会使任务完成。
这很好,但我想知道工人是否会在戳之间被释放。我知道 ExternalTaskSensor
曾经有一个参数 reschedule
可以用于大于 1m 的 poke,这将释放 poke 之间的工作槽 - 但我在文档中再也看不到它了。
我的问题是 wait_for_completion
参数是否会导致操作员在戳之间释放工人?看代码觉得不是这样,所以想验证一下。
如果不释放 worker 并且触发的 DAG 必然需要超过 1m 才能完成,这里最好的方法应该是什么?
我们使用的是 MWAA Airflow 2.2,所以我猜延迟运算符不是一个选项(如果在这种情况下它是一个解决方案)
在TriggerDagRunOperator
中使用wait_for_completion=True
时,只要操作员是运行,工人就不会被释放。您可以在 operator implementation 中看到它。运算符使用 time.sleep(self.poke_interval)
正如您所指出的,有两种方法可以实现验证触发的 dag 已完成的目标:
- DAG A 使用
TriggerDagRunOperator
后跟ExternalTaskSensor
- 将
TriggerDagRunOperator
与wait_for_completion=True
结合使用
但是除了您提到的资源问题之外,这两个选项并不完全等同。
在选项 1 中,如果触发的 DAG 失败,则 ExternalTaskSensor
将失败。
在选项 2 中考虑:
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
my_op = TriggerDagRunOperator (
task_id='task',
trigger_dag_id="dag_b",
...,
wait_for_completion=True,
retries=2
)
如果 dag_b
失败,则 TriggerDagRunOperator
将重试,这将调用 dag_b
的另一个 DagRun。
两个选项都有效。您需要决定哪种行为适合您的用例。