当超过 "dagrun_timeout" 时气流触发 "on_failure_callback"
Airflow triggering the "on_failure_callback" when the "dagrun_timeout" is exceeded
目前正在为 Airflow 中的 运行 长任务设置警报。对于 cancel/fail 气流 dag,我在 default_args 中放置了“dagrun_timeout”,它满足了我的需要,fails/errors 气流 dag 在 运行太久(通常卡住)。唯一的问题是当超过 dagrun_timeout 时,“on_failure_callback”中的函数不会被调用,因为“on_failure_callback”在任务级别(我认为),而dagrun_timeout 处于 dag 级别。
如何在超过dagrun_timeout时执行“on_failure_callback”,或者如何指定dag失败时要调用的函数?还是我应该重新考虑我的方法?
尝试在 DAG 声明期间设置 on_failure_callback
:
with DAG(
dag_id="failure_callback_example",
on_failure_callback=_on_dag_run_fail,
...
) as dag:
...
解释是 default_args
中定义的 on_failure_callback
将仅传递给正在创建的 Tasks,而不传递给 DAG 对象。
这是一个尝试这种行为的例子:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
def _on_dag_run_fail(context):
print("***DAG failed!! do something***")
print(f"The DAG failed because: {context['reason']}")
print(context)
def _alarm(context):
print("** Alarm Alarm!! **")
task_instance: TaskInstance = context.get("task_instance")
print(f"Task Instance: {task_instance} failed!")
default_args = {
"owner": "mi_empresa",
"email_on_failure": False,
"on_failure_callback": _alarm,
}
with DAG(
dag_id="failure_callback_example",
start_date=datetime(2021, 9, 7),
schedule_interval=None,
default_args=default_args,
catchup=False,
on_failure_callback=_on_dag_run_fail,
dagrun_timeout=timedelta(seconds=45),
) as dag:
delayed = BashOperator(
task_id="delayed",
bash_command='echo "waiting..";sleep 60; echo "Done!!"',
)
will_fail = BashOperator(
task_id="will_fail",
bash_command="exit 1",
# on_failure_callback=_alarm,
)
delayed >> will_fail
您可以在调度程序日志中找到回调执行的日志 AIRFLOW_HOME/logs/scheduler/date/failure_callback_example
:
[2021-09-24 13:12:34,285] {logging_mixin.py:104} INFO - [2021-09-24 13:12:34,285] {dag.py:862} INFO - Executing dag callback function: <function _on_dag_run_fail at 0x7f83102e8670>
[2021-09-24 13:12:34,336] {logging_mixin.py:104} INFO - ***DAG failed!! do something***
[2021-09-24 13:12:34,345] {logging_mixin.py:104} INFO - The DAG failed because: timed_out
编辑:
在 context
字典中传递键 reason
以指定 DAG 运行 失败的原因。一些值是: 'reason': 'timed_out'
或 'reason': 'task_failure'
。这可用于根据 DAG 运行 失败的 reason
在回调中执行特定行为。
目前正在为 Airflow 中的 运行 长任务设置警报。对于 cancel/fail 气流 dag,我在 default_args 中放置了“dagrun_timeout”,它满足了我的需要,fails/errors 气流 dag 在 运行太久(通常卡住)。唯一的问题是当超过 dagrun_timeout 时,“on_failure_callback”中的函数不会被调用,因为“on_failure_callback”在任务级别(我认为),而dagrun_timeout 处于 dag 级别。
如何在超过dagrun_timeout时执行“on_failure_callback”,或者如何指定dag失败时要调用的函数?还是我应该重新考虑我的方法?
尝试在 DAG 声明期间设置 on_failure_callback
:
with DAG(
dag_id="failure_callback_example",
on_failure_callback=_on_dag_run_fail,
...
) as dag:
...
解释是 default_args
中定义的 on_failure_callback
将仅传递给正在创建的 Tasks,而不传递给 DAG 对象。
这是一个尝试这种行为的例子:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
def _on_dag_run_fail(context):
print("***DAG failed!! do something***")
print(f"The DAG failed because: {context['reason']}")
print(context)
def _alarm(context):
print("** Alarm Alarm!! **")
task_instance: TaskInstance = context.get("task_instance")
print(f"Task Instance: {task_instance} failed!")
default_args = {
"owner": "mi_empresa",
"email_on_failure": False,
"on_failure_callback": _alarm,
}
with DAG(
dag_id="failure_callback_example",
start_date=datetime(2021, 9, 7),
schedule_interval=None,
default_args=default_args,
catchup=False,
on_failure_callback=_on_dag_run_fail,
dagrun_timeout=timedelta(seconds=45),
) as dag:
delayed = BashOperator(
task_id="delayed",
bash_command='echo "waiting..";sleep 60; echo "Done!!"',
)
will_fail = BashOperator(
task_id="will_fail",
bash_command="exit 1",
# on_failure_callback=_alarm,
)
delayed >> will_fail
您可以在调度程序日志中找到回调执行的日志 AIRFLOW_HOME/logs/scheduler/date/failure_callback_example
:
[2021-09-24 13:12:34,285] {logging_mixin.py:104} INFO - [2021-09-24 13:12:34,285] {dag.py:862} INFO - Executing dag callback function: <function _on_dag_run_fail at 0x7f83102e8670>
[2021-09-24 13:12:34,336] {logging_mixin.py:104} INFO - ***DAG failed!! do something***
[2021-09-24 13:12:34,345] {logging_mixin.py:104} INFO - The DAG failed because: timed_out
编辑:
在 context
字典中传递键 reason
以指定 DAG 运行 失败的原因。一些值是: 'reason': 'timed_out'
或 'reason': 'task_failure'
。这可用于根据 DAG 运行 失败的 reason
在回调中执行特定行为。