任何 DAG 故障的全局警报
Global Alert on any DAG Failure
我目前有超过 100 个 DAG 运行 在生产中。我知道如何使用 on_failure_callback
和由上游故障触发的操作员添加警报,但是有没有一种方法可以将 Airflow 本身配置为在 DAG 失败时始终发送电子邮件,而不必通过并更新每个我的 DAG 单独发出失败警报?
据我所知不是,但我有这个助手来处理我的 global/default dag/operator 设置:
def on_failure_callback(context):
...
def on_success_callback(context):
...
def build_default_args(**kwargs):
default_args = {
'on_failure_callback': on_failure_callback,
'on_success_callback': on_success_callback,
'owner': 'me',
'queue': 'default',
'execution_timeout': timedelta(hours=1),
'retries': 3,
'retry_delay': timedelta(seconds=10),
}
default_args.update(kwargs)
return default_args
然后在每个 DAG 中:
dag = DAG(
dag_id='my_dag',
default_args=build_default_args(
start_date=datetime(2017, 9, 20),
execution_timeout=timedelta(hours=8), # overrides default
),
schedule_interval='@hourly',
)
或者一些自定义基础 DAG
class...但无论哪种方式,您仍然需要返回并更改您的 100+ DAG 一次。
我目前有超过 100 个 DAG 运行 在生产中。我知道如何使用 on_failure_callback
和由上游故障触发的操作员添加警报,但是有没有一种方法可以将 Airflow 本身配置为在 DAG 失败时始终发送电子邮件,而不必通过并更新每个我的 DAG 单独发出失败警报?
据我所知不是,但我有这个助手来处理我的 global/default dag/operator 设置:
def on_failure_callback(context):
...
def on_success_callback(context):
...
def build_default_args(**kwargs):
default_args = {
'on_failure_callback': on_failure_callback,
'on_success_callback': on_success_callback,
'owner': 'me',
'queue': 'default',
'execution_timeout': timedelta(hours=1),
'retries': 3,
'retry_delay': timedelta(seconds=10),
}
default_args.update(kwargs)
return default_args
然后在每个 DAG 中:
dag = DAG(
dag_id='my_dag',
default_args=build_default_args(
start_date=datetime(2017, 9, 20),
execution_timeout=timedelta(hours=8), # overrides default
),
schedule_interval='@hourly',
)
或者一些自定义基础 DAG
class...但无论哪种方式,您仍然需要返回并更改您的 100+ DAG 一次。