异常如何传递给 on_failure_callback?
How is exception passed to on_failure_callback?
我想获取传递给 on_failure_callback 的异常以检查错误是什么。例如,如果它在某个 DAG 中包含 'there are duplicates',则该函数将不会执行任何操作。否则,它将发送一封电子邮件。
但是,我看不到异常的格式。我在 Docker 中使用 Airflow 2.1.2 并且是我的 dag 定义如下:
with DAG(process_name,
default_args=default_args,
schedule_interval='@daily',
max_active_runs=1,
tags=['import', 'es'],
on_failure_callback=known_error_dag
) as dag:
operators
已尝试以下解决方案:
def known_error_dag(context):
# 1
ti = context['ti']
ti.xcom_push(key='exception', value=context['exception'])
# 2
print(context['exception'])
# 3
logging.info(context['exception'])
我在 UI 和 docker 日志中都看不到异常。此外,它没有出现在 XCOM 中。
这个问题的答案不清楚我想要的是否可行:Get Exception details on Airflow on_failure_callback context
然而,天文学课程指出这确实是可能的。 https://academy.astronomer.io/astronomer-certification-apache-airflow-dag-authoring-preparation
您可以在 DAG 和任务级别上定义 on_failure_callback
。异常仅传递给任务级别的失败回调,因此在您的操作员上配置回调,或通过 DAG 上的 default_args
将回调配置给所有操作员:
with DAG(
process_name,
default_args=default_args,
schedule_interval='@daily',
max_active_runs=1,
tags=['import', 'es'],
default_args={
"on_failure_callback": known_error_dag,
},
) as dag:
在 DAG 级别上定义的 on_failure_callback
也将采用上下文变量,其中包括一个键“reason
”,但在DAG 失败 运行,因此在大多数情况下不是很有用。
我想获取传递给 on_failure_callback 的异常以检查错误是什么。例如,如果它在某个 DAG 中包含 'there are duplicates',则该函数将不会执行任何操作。否则,它将发送一封电子邮件。
但是,我看不到异常的格式。我在 Docker 中使用 Airflow 2.1.2 并且是我的 dag 定义如下:
with DAG(process_name,
default_args=default_args,
schedule_interval='@daily',
max_active_runs=1,
tags=['import', 'es'],
on_failure_callback=known_error_dag
) as dag:
operators
已尝试以下解决方案:
def known_error_dag(context):
# 1
ti = context['ti']
ti.xcom_push(key='exception', value=context['exception'])
# 2
print(context['exception'])
# 3
logging.info(context['exception'])
我在 UI 和 docker 日志中都看不到异常。此外,它没有出现在 XCOM 中。
这个问题的答案不清楚我想要的是否可行:Get Exception details on Airflow on_failure_callback context
然而,天文学课程指出这确实是可能的。 https://academy.astronomer.io/astronomer-certification-apache-airflow-dag-authoring-preparation
您可以在 DAG 和任务级别上定义 on_failure_callback
。异常仅传递给任务级别的失败回调,因此在您的操作员上配置回调,或通过 DAG 上的 default_args
将回调配置给所有操作员:
with DAG(
process_name,
default_args=default_args,
schedule_interval='@daily',
max_active_runs=1,
tags=['import', 'es'],
default_args={
"on_failure_callback": known_error_dag,
},
) as dag:
在 DAG 级别上定义的 on_failure_callback
也将采用上下文变量,其中包括一个键“reason
”,但在DAG 失败 运行,因此在大多数情况下不是很有用。