气流 2.2 TriggerDagRunOperator wait_for_completion 行为

Airflow 2.2 TriggerDagRunOperator wait_for_completion behavior

我对 TriggerDagRunOperator 有疑问,特别是 wait_for_completion 参数。

在迁移到 Airflow 2.2 之前,我们使用此运算符触发另一个 DAG 和一个 ExternalTaskSensor 以等待其完成。

在 Airflow 2.2 中,有一个名为 wait_for_completion 的新参数,如果设置为 True,则只有在触发的 DAG 完成时才会使任务完成。

这很好,但我想知道工人是否会在戳之间被释放。我知道 ExternalTaskSensor 曾经有一个参数 reschedule 可以用于大于 1m 的 poke,这将释放 poke 之间的工作槽 - 但我在文档中再也看不到它了。

我的问题是 wait_for_completion 参数是否会导致操作员在戳之间释放工人?看代码觉得不是这样,所以想验证一下。

如果不释放 worker 并且触发的 DAG 必然需要超过 1m 才能完成,这里最好的方法应该是什么?

我们使用的是 MWAA Airflow 2.2,所以我猜延迟运算符不是一个选项(如果在这种情况下它是一个解决方案)

TriggerDagRunOperator中使用wait_for_completion=True时,只要操作员是运行,工人就不会被释放。您可以在 operator implementation 中看到它。运算符使用 time.sleep(self.poke_interval)

正如您所指出的,有两种方法可以实现验证触发的 dag 已完成的目标:

  1. DAG A 使用 TriggerDagRunOperator 后跟 ExternalTaskSensor
  2. TriggerDagRunOperatorwait_for_completion=True
  3. 结合使用

但是除了您提到的资源问题之外,这两个选项并不完全等同。

在选项 1 中,如果触发的 DAG 失败,则 ExternalTaskSensor 将失败。

在选项 2 中考虑:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator
my_op = TriggerDagRunOperator (
    task_id='task',
    trigger_dag_id="dag_b",
    ...,
    wait_for_completion=True,
    retries=2
)

如果 dag_b 失败,则 TriggerDagRunOperator 将重试,这将调用 dag_b 的另一个 DagRun。

两个选项都有效。您需要决定哪种行为适合您的用例。