当超过 "dagrun_timeout" 时气流触发 "on_failure_callback"

Airflow triggering the "on_failure_callback" when the "dagrun_timeout" is exceeded

目前正在为 Airflow 中的 运行 长任务设置警报。对于 cancel/fail 气流 dag,我在 default_args 中放置了“dagrun_timeout”,它满足了我的需要,fails/errors 气流 dag 在 运行太久(通常卡住)。唯一的问题是当超过 dagrun_timeout 时,“on_failure_callback”中的函数不会被调用,因为“on_failure_callback”在任务级别(我认为),而dagrun_timeout 处于 dag 级别。

如何在超过dagrun_timeout时执行“on_failure_callback”,或者如何指定dag失败时要调用的函数?还是我应该重新考虑我的方法?

尝试在 DAG 声明期间设置 on_failure_callback

with DAG(
    dag_id="failure_callback_example",
    on_failure_callback=_on_dag_run_fail,
    ...
) as dag:
...

解释是 default_args 中定义的 on_failure_callback 将仅传递给正在创建的 Tasks,而不传递给 DAG 对象。

这是一个尝试这种行为的例子:


from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator


def _on_dag_run_fail(context):
    print("***DAG failed!! do something***")
    print(f"The DAG failed because: {context['reason']}")
    print(context)


def _alarm(context):
    print("** Alarm Alarm!! **")
    task_instance: TaskInstance = context.get("task_instance")
    print(f"Task Instance: {task_instance} failed!")


default_args = {
    "owner": "mi_empresa",
    "email_on_failure": False,
    "on_failure_callback": _alarm,
}


with DAG(
    dag_id="failure_callback_example",
    start_date=datetime(2021, 9, 7),
    schedule_interval=None,
    default_args=default_args,
    catchup=False,
    on_failure_callback=_on_dag_run_fail,
    dagrun_timeout=timedelta(seconds=45),
) as dag:

    delayed = BashOperator(
        task_id="delayed",
        bash_command='echo "waiting..";sleep 60; echo "Done!!"',
    )
    will_fail = BashOperator(
        task_id="will_fail",
        bash_command="exit 1",
        # on_failure_callback=_alarm,
    )
delayed >> will_fail

您可以在调度程序日志中找到回调执行的日志 AIRFLOW_HOME/logs/scheduler/date/failure_callback_example :

[2021-09-24 13:12:34,285] {logging_mixin.py:104} INFO - [2021-09-24 13:12:34,285] {dag.py:862} INFO - Executing dag callback function: <function _on_dag_run_fail at 0x7f83102e8670>
[2021-09-24 13:12:34,336] {logging_mixin.py:104} INFO - ***DAG failed!! do something***
[2021-09-24 13:12:34,345] {logging_mixin.py:104} INFO - The DAG failed because: timed_out

编辑:

context 字典中传递键 reason 以指定 DAG 运行 失败的原因。一些值是: 'reason': 'timed_out''reason': 'task_failure' 。这可用于根据 DAG 运行 失败的 reason 在回调中执行特定行为。