Airflow:从外部 DAG 获取任务状态
Airflow: Get status of a task from an external DAG
我想从外部 DAG 获取任务的状态。根据某些条件,我在 2 个不同的 DAG 中有相同的任务 运行。因此,我想从 DAG1 检查 DAG2 中此任务的状态。如果DAG2的任务状态是'running',那我就跳过DAG1的这个任务。
我尝试使用:
dag_runs = DagRun.find(dag_id=dag_id,execution_date=exec_dt)
for dag_run in dag_runs:
dag_run.state
我不知道我们是否可以使用 DagRun 获取任务状态。
如果我使用 TaskDependencySensor,DAG 将不得不等到找到任务的 allowed_states。
有没有办法获取另一个 DAG 中任务的当前状态?
我使用下面的代码从另一个 DAG 获取任务的状态:
from airflow.api.common.experimental.get_task_instance import get_task_instance
def get_dag_state(execution_date, **kwargs):
ti = get_task_instance('dag_id', 'task_id', execution_date)
task_status = ti.current_state()
return task_status
dag_status = BranchPythonOperator(
task_id='dag_status',
python_callable=get_dag_state,
dag=dag
)
可以找到更多详细信息here
我想从外部 DAG 获取任务的状态。根据某些条件,我在 2 个不同的 DAG 中有相同的任务 运行。因此,我想从 DAG1 检查 DAG2 中此任务的状态。如果DAG2的任务状态是'running',那我就跳过DAG1的这个任务。
我尝试使用:
dag_runs = DagRun.find(dag_id=dag_id,execution_date=exec_dt)
for dag_run in dag_runs:
dag_run.state
我不知道我们是否可以使用 DagRun 获取任务状态。 如果我使用 TaskDependencySensor,DAG 将不得不等到找到任务的 allowed_states。
有没有办法获取另一个 DAG 中任务的当前状态?
我使用下面的代码从另一个 DAG 获取任务的状态:
from airflow.api.common.experimental.get_task_instance import get_task_instance
def get_dag_state(execution_date, **kwargs):
ti = get_task_instance('dag_id', 'task_id', execution_date)
task_status = ti.current_state()
return task_status
dag_status = BranchPythonOperator(
task_id='dag_status',
python_callable=get_dag_state,
dag=dag
)
可以找到更多详细信息here