异常如何传递给 on_failure_callback?

How is exception passed to on_failure_callback?

我想获取传递给 on_failure_callback 的异常以检查错误是什么。例如,如果它在某个 DAG 中包含 'there are duplicates',则该函数将不会执行任何操作。否则,它将发送一封电子邮件。

但是,我看不到异常的格式。我在 Docker 中使用 Airflow 2.1.2 并且是我的 dag 定义如下:

with DAG(process_name,
         default_args=default_args,
         schedule_interval='@daily',
         max_active_runs=1,
         tags=['import', 'es'],
         on_failure_callback=known_error_dag
         ) as dag:
    operators

已尝试以下解决方案:

def known_error_dag(context):
    #  1
    ti = context['ti']
    ti.xcom_push(key='exception', value=context['exception'])

    #  2
    print(context['exception'])
    
    #  3
    logging.info(context['exception'])

我在 UI 和 docker 日志中都看不到异常。此外,它没有出现在 XCOM 中。

这个问题的答案不清楚我想要的是否可行:Get Exception details on Airflow on_failure_callback context

然而,天文学课程指出这确实是可能的。 https://academy.astronomer.io/astronomer-certification-apache-airflow-dag-authoring-preparation

您可以在 DAG 和任务级别上定义 on_failure_callback。异常仅传递给任务级别的失败回调,因此在您的操作员上配置回调,或通过 DAG 上的 default_args 将回调配置给所有操作员:

with DAG(
    process_name,
    default_args=default_args,
    schedule_interval='@daily',
    max_active_runs=1,
    tags=['import', 'es'],
    default_args={
        "on_failure_callback": known_error_dag,
    },
) as dag:

在 DAG 级别上定义的 on_failure_callback 也将采用上下文变量,其中包括一个键“reason”,但在DAG 失败 运行,因此在大多数情况下不是很有用。