如何在外部传感器气流中检查 DAG 中任务的不同 运行 次
How to check different run times of a task in a DAG in an External Sensor Airflow
假设我有一个 DAG A 包括一些任务,这些任务依赖于 DAG B 中另一个任务的一些外部传感器。
例如,我想在 10:00 上检查 DAG B 中的任务状态,如果此 运行成功,则DAG A中的任务可以运行。
但是现在因为一个原因,在10:00DAG B的任务失败了,但是同一个任务的运行 11:00 成功。
问题是 DAG A 中的任务将永远挂起,因为 DAG B 中的任务在 10 失败:00。不过如果下一个运行有运行成功就好了。
我如何在外部传感器气流中实现这样的功能来检查另一个 DAG 中下一个 运行 时间的状态,如果它成功,那么我的任务可以 运行 没有问题?
P.S: 由于某些原因我无法使用重试!
提前致谢。
我自己找到了解决办法。这可能不是最好的方法,但它确实有效。
在这种情况下,我们可以为 on_failure_callback
定义一个函数,并为我们的 ExternalSensor 设置一个 timeout,当达到超时时,我们检查另一个 DAG 中任务的下一个 运行,如果成功,我们将 ExternalSensor 的状态设置为 SUCCESS
因此依赖此传感器的其他任务可以 运行 没有问题。
这是此方法的代码:
from airflow.utils.state import State
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.exceptions import AirflowSensorTimeout
from datetime import datetime, timedelta, timezone
from airflow.api.common.experimental.get_task_instance import get_task_instance
from dateutil.parser import parse
from functools import partial
def _failure_callback(task_id, dag_id, execution_date, context):
if isinstance(context['exception'], AirflowSensorTimeout):
sensor_instance = context['task_instance']
next_execution_date = parse(context['ts']) + -(execution_date) + timedelta(hours=1)
ti = get_task_instance(dag_id=dag_id, task_id=task_id, execution_date=next_execution_date)
if ti.current_state() == 'success':
sensor_instance.set_state(State.SUCCESS)
sensor = ExternalTaskSensor(external_task_id='external_task_id',
task_id='sensor',
external_dag_id='external_dag_id',
execution_delta=timedelta(hours=-24) + timedelta(minutes=-30),
timeout=5,
on_failure_callback=partial(_failure_callback, 'external_task_id', 'external_dag_id', timedelta(hours=-24) + timedelta(minutes=-30)),
dag=dag)
假设我有一个 DAG A 包括一些任务,这些任务依赖于 DAG B 中另一个任务的一些外部传感器。
例如,我想在 10:00 上检查 DAG B 中的任务状态,如果此 运行成功,则DAG A中的任务可以运行。 但是现在因为一个原因,在10:00DAG B的任务失败了,但是同一个任务的运行 11:00 成功。
问题是 DAG A 中的任务将永远挂起,因为 DAG B 中的任务在 10 失败:00。不过如果下一个运行有运行成功就好了。
我如何在外部传感器气流中实现这样的功能来检查另一个 DAG 中下一个 运行 时间的状态,如果它成功,那么我的任务可以 运行 没有问题?
P.S: 由于某些原因我无法使用重试!
提前致谢。
我自己找到了解决办法。这可能不是最好的方法,但它确实有效。
在这种情况下,我们可以为 on_failure_callback
定义一个函数,并为我们的 ExternalSensor 设置一个 timeout,当达到超时时,我们检查另一个 DAG 中任务的下一个 运行,如果成功,我们将 ExternalSensor 的状态设置为 SUCCESS
因此依赖此传感器的其他任务可以 运行 没有问题。
这是此方法的代码:
from airflow.utils.state import State
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.exceptions import AirflowSensorTimeout
from datetime import datetime, timedelta, timezone
from airflow.api.common.experimental.get_task_instance import get_task_instance
from dateutil.parser import parse
from functools import partial
def _failure_callback(task_id, dag_id, execution_date, context):
if isinstance(context['exception'], AirflowSensorTimeout):
sensor_instance = context['task_instance']
next_execution_date = parse(context['ts']) + -(execution_date) + timedelta(hours=1)
ti = get_task_instance(dag_id=dag_id, task_id=task_id, execution_date=next_execution_date)
if ti.current_state() == 'success':
sensor_instance.set_state(State.SUCCESS)
sensor = ExternalTaskSensor(external_task_id='external_task_id',
task_id='sensor',
external_dag_id='external_dag_id',
execution_delta=timedelta(hours=-24) + timedelta(minutes=-30),
timeout=5,
on_failure_callback=partial(_failure_callback, 'external_task_id', 'external_dag_id', timedelta(hours=-24) + timedelta(minutes=-30)),
dag=dag)