如何在外部传感器气流中检查 DAG 中任务的不同 运行 次

How to check different run times of a task in a DAG in an External Sensor Airflow

假设我有一个 DAG A 包括一些任务,这些任务依赖于 DAG B 中另一个任务的一些外部传感器。

例如,我想在 10:00 上检查 DAG B 中的任务状态,如果此 运行成功,则DAG A中的任务可以运行。 但是现在因为一个原因,在10:00DAG B的任务失败了,但是同一个任务的运行 11:00 成功。

问题是 DAG A 中的任务将永远挂起,因为 DAG B 中的任务在 10 失败:00。不过如果下一个运行有运行成功就好了。

我如何在外部传感器气流中实现这样的功能来检查另一个 DAG 中下一个 运行 时间的状态,如果它成功,那么我的任务可以 运行 没有问题?

P.S: 由于某些原因我无法使用重试!

提前致谢。

我自己找到了解决办法。这可能不是最好的方法,但它确实有效。 在这种情况下,我们可以为 on_failure_callback 定义一个函数,并为我们的 ExternalSensor 设置一个 timeout,当达到超时时,我们检查另一个 DAG 中任务的下一个 运行,如果成功,我们将 ExternalSensor 的状态设置为 SUCCESS 因此依赖此传感器的其他任务可以 运行 没有问题。 这是此方法的代码:

from airflow.utils.state import State
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.exceptions import AirflowSensorTimeout
from datetime import datetime, timedelta, timezone
from airflow.api.common.experimental.get_task_instance import get_task_instance
from dateutil.parser import parse
from functools import partial

def _failure_callback(task_id, dag_id, execution_date, context):
    if isinstance(context['exception'], AirflowSensorTimeout):
        sensor_instance = context['task_instance']
        next_execution_date = parse(context['ts']) + -(execution_date) + timedelta(hours=1)
        ti = get_task_instance(dag_id=dag_id, task_id=task_id, execution_date=next_execution_date)
        if ti.current_state() == 'success':
            sensor_instance.set_state(State.SUCCESS)



sensor = ExternalTaskSensor(external_task_id='external_task_id',
                              task_id='sensor',
                              external_dag_id='external_dag_id',
                              execution_delta=timedelta(hours=-24) + timedelta(minutes=-30),
                              timeout=5,
                              on_failure_callback=partial(_failure_callback, 'external_task_id', 'external_dag_id', timedelta(hours=-24) + timedelta(minutes=-30)),
                              dag=dag)