Airflow:从外部 DAG 获取任务状态

Airflow: Get status of a task from an external DAG

我想从外部 DAG 获取任务的状态。根据某些条件,我在 2 个不同的 DAG 中有相同的任务 运行。因此,我想从 DAG1 检查 DAG2 中此任务的状态。如果DAG2的任务状态是'running',那我就跳过DAG1的这个任务。

我尝试使用:

dag_runs = DagRun.find(dag_id=dag_id,execution_date=exec_dt)
for dag_run in dag_runs:
  dag_run.state

我不知道我们是否可以使用 DagRun 获取任务状态。 如果我使用 TaskDependencySensor,DAG 将不得不等到找到任务的 allowed_states。

有没有办法获取另一个 DAG 中任务的当前状态?

我使用下面的代码从另一个 DAG 获取任务的状态:

from airflow.api.common.experimental.get_task_instance import get_task_instance

def get_dag_state(execution_date, **kwargs):
    ti = get_task_instance('dag_id', 'task_id', execution_date)
    task_status = ti.current_state()
    return task_status

dag_status = BranchPythonOperator(
    task_id='dag_status',
    python_callable=get_dag_state,
    dag=dag
    )

可以找到更多详细信息here