如何在 Airflow 1.10.10+composer 中使用 on_failure_callback?

How to use of on_failure_callback in Airflow 1.10.10+composer?

我希望在单个 Airflow Operator 失败时收到电子邮件通知。我需要它,因为某些任务的失败不能将整个管道设置为失败。

为了模拟错误,我将源存储桶设置为不存在的存储桶。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

__author__ = "Stefano Giostra"
__credits__ = "Stefano Giostra"
__maintainer__ = "Stefano Giostra"
__version__ = "0.9.3"
__status__ = "Dev"


from airflow.models import Variable, DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
# from lib.bb_utils import *
import logging
from airflow.utils import dates
from datetime import timedelta
from functools import partial
from lib.bb_utils import load_json_file
from airflow.utils.email import send_email


ROOT_PATH = '/home/airflow/gcs/dags'
logger = logging.getLogger("dag_demo_2")


def notify_email(context, config):        # **kwargs
    """Send custom email alerts."""

    alerting_email_address = config.get('email_address')
    print("---> notify_email -------------------")
    print(context)
    print(f"-->{alerting_email_address}")
    print("<------------------------------------")
    # print(context['dag'])
    # email title.
    # title = "Airflow alert: {task_name} Failed".format(context)
    #
    # # email contents
    # body = """
    # Hi, <br><br>
    # There's been an error in the {task_name} job.<br>
    # <br>
    # Forever yours,<br>
    # Airflow bot <br>
    # """.format(**contextDict)
    # for dest in dest_email:
    #     send_email(dest, title, body)


# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
# Dizionario dati con le chiavi richieste dai DAG di AirFlow
my_email = 'abc@xyz.com'
default_args = {
    "owner": 'SG',
    "depends_on_past": False,
    "start_date": dates.days_ago(1),
    "end_date": None,
    "email_on_failure": 'my_email',
    "email_on_retry": False,
    "email": [my_email],
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "max_active_runs": 1,
    "on_failure_callback": partial(notify_email, config={'email_address': my_email})
}

dag_name = 'SG-DagDemo-Once'
with DAG(dag_id=dag_name, default_args=default_args, schedule_interval="@once") as ldag:
    project = Variable.get("PROJECT")
    source_bucket = 'sg-dev'
    source_object = 'covid19_italy/national_trends_2.csv'
    bq_dataset = "covid19_italy"
    bq_table_name = "national_trends"
    bq_task_id = f'gcs_to_bq_load_{bq_table_name}'
    schema_fields = load_json_file(f"{ROOT_PATH}/source/{bq_dataset}/{bq_table_name}_tabschema.json")

    t = GoogleCloudStorageToBigQueryOperator(
        dag=ldag,
        task_id=bq_task_id,
        bucket=source_bucket,
        source_objects=[source_object],
        destination_project_dataset_table="{0}.{1}.{2}".format(project, bq_dataset, bq_table_name),
        schema_fields=schema_fields,
        source_format='CSV',
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE"
    )

要在失败时调用 notify_email(),只需将 default_args 调整为:

"on_failure_callback": notify_email

那么default_args应该包含在DAG创建语句中:

with DAG(dag_id='SG-DagDemo-Once', default_args=default_args) as dag:

您可以尝试类似下面的方法来调用函数 notify_email() 运算符失败;每个运算符将调用相同的函数(示例取自 gcs_to_bq):

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'on_failure_callback': notify_email
}

dag_name = 'SG-DagDemo-Once'
with DAG(dag_id=dag_name, default_args=args, schedule_interval=None) as dag:
    create_test_dataset = bash_operator.BashOperator(
        task_id='create_airflow_test_dataset',
        bash_command='bq mk airflow_test')

    # [START howto_operator_gcs_to_bq]
    load_csv = GoogleCloudStorageToBigQueryOperator(
        task_id='gcs_to_bq_example',
        bucket='cloud-samples-data',
        source_objects=['bigquery/us-states/us-states.csv'],
        destination_project_dataset_table='airflow_test.gcs_to_bq_table',
        schema_fields=[
            {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
        ],
        write_disposition='WRITE_TRUNCATE')
    # [END howto_operator_gcs_to_bq]

    delete_test_dataset = bash_operator.BashOperator(
        task_id='delete_airflow_test_dataset',
        bash_command='bq rm -r -f -d airflow_test')

    create_test_dataset >> load_csv >> delete_test_dataset

您可以通过更改每个操作员的配置来模拟错误。您需要在 notify_email().

中完成发送电子邮件的配置