如何在 Airflow 1.10.10+composer 中使用 on_failure_callback?
How to use of on_failure_callback in Airflow 1.10.10+composer?
我希望在单个 Airflow Operator 失败时收到电子邮件通知。我需要它,因为某些任务的失败不能将整个管道设置为失败。
为了模拟错误,我将源存储桶设置为不存在的存储桶。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Stefano Giostra"
__credits__ = "Stefano Giostra"
__maintainer__ = "Stefano Giostra"
__version__ = "0.9.3"
__status__ = "Dev"
from airflow.models import Variable, DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
# from lib.bb_utils import *
import logging
from airflow.utils import dates
from datetime import timedelta
from functools import partial
from lib.bb_utils import load_json_file
from airflow.utils.email import send_email
ROOT_PATH = '/home/airflow/gcs/dags'
logger = logging.getLogger("dag_demo_2")
def notify_email(context, config): # **kwargs
"""Send custom email alerts."""
alerting_email_address = config.get('email_address')
print("---> notify_email -------------------")
print(context)
print(f"-->{alerting_email_address}")
print("<------------------------------------")
# print(context['dag'])
# email title.
# title = "Airflow alert: {task_name} Failed".format(context)
#
# # email contents
# body = """
# Hi, <br><br>
# There's been an error in the {task_name} job.<br>
# <br>
# Forever yours,<br>
# Airflow bot <br>
# """.format(**contextDict)
# for dest in dest_email:
# send_email(dest, title, body)
# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
# Dizionario dati con le chiavi richieste dai DAG di AirFlow
my_email = 'abc@xyz.com'
default_args = {
"owner": 'SG',
"depends_on_past": False,
"start_date": dates.days_ago(1),
"end_date": None,
"email_on_failure": 'my_email',
"email_on_retry": False,
"email": [my_email],
"retries": 2,
"retry_delay": timedelta(minutes=5),
"max_active_runs": 1,
"on_failure_callback": partial(notify_email, config={'email_address': my_email})
}
dag_name = 'SG-DagDemo-Once'
with DAG(dag_id=dag_name, default_args=default_args, schedule_interval="@once") as ldag:
project = Variable.get("PROJECT")
source_bucket = 'sg-dev'
source_object = 'covid19_italy/national_trends_2.csv'
bq_dataset = "covid19_italy"
bq_table_name = "national_trends"
bq_task_id = f'gcs_to_bq_load_{bq_table_name}'
schema_fields = load_json_file(f"{ROOT_PATH}/source/{bq_dataset}/{bq_table_name}_tabschema.json")
t = GoogleCloudStorageToBigQueryOperator(
dag=ldag,
task_id=bq_task_id,
bucket=source_bucket,
source_objects=[source_object],
destination_project_dataset_table="{0}.{1}.{2}".format(project, bq_dataset, bq_table_name),
schema_fields=schema_fields,
source_format='CSV',
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE"
)
要在失败时调用 notify_email()
,只需将 default_args
调整为:
"on_failure_callback": notify_email
那么default_args
应该包含在DAG创建语句中:
with DAG(dag_id='SG-DagDemo-Once', default_args=default_args) as dag:
您可以尝试类似下面的方法来调用函数 notify_email()
运算符失败;每个运算符将调用相同的函数(示例取自 gcs_to_bq):
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(1),
'on_failure_callback': notify_email
}
dag_name = 'SG-DagDemo-Once'
with DAG(dag_id=dag_name, default_args=args, schedule_interval=None) as dag:
create_test_dataset = bash_operator.BashOperator(
task_id='create_airflow_test_dataset',
bash_command='bq mk airflow_test')
# [START howto_operator_gcs_to_bq]
load_csv = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq_example',
bucket='cloud-samples-data',
source_objects=['bigquery/us-states/us-states.csv'],
destination_project_dataset_table='airflow_test.gcs_to_bq_table',
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE')
# [END howto_operator_gcs_to_bq]
delete_test_dataset = bash_operator.BashOperator(
task_id='delete_airflow_test_dataset',
bash_command='bq rm -r -f -d airflow_test')
create_test_dataset >> load_csv >> delete_test_dataset
您可以通过更改每个操作员的配置来模拟错误。您需要在 notify_email()
.
中完成发送电子邮件的配置
我希望在单个 Airflow Operator 失败时收到电子邮件通知。我需要它,因为某些任务的失败不能将整个管道设置为失败。
为了模拟错误,我将源存储桶设置为不存在的存储桶。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Stefano Giostra"
__credits__ = "Stefano Giostra"
__maintainer__ = "Stefano Giostra"
__version__ = "0.9.3"
__status__ = "Dev"
from airflow.models import Variable, DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
# from lib.bb_utils import *
import logging
from airflow.utils import dates
from datetime import timedelta
from functools import partial
from lib.bb_utils import load_json_file
from airflow.utils.email import send_email
ROOT_PATH = '/home/airflow/gcs/dags'
logger = logging.getLogger("dag_demo_2")
def notify_email(context, config): # **kwargs
"""Send custom email alerts."""
alerting_email_address = config.get('email_address')
print("---> notify_email -------------------")
print(context)
print(f"-->{alerting_email_address}")
print("<------------------------------------")
# print(context['dag'])
# email title.
# title = "Airflow alert: {task_name} Failed".format(context)
#
# # email contents
# body = """
# Hi, <br><br>
# There's been an error in the {task_name} job.<br>
# <br>
# Forever yours,<br>
# Airflow bot <br>
# """.format(**contextDict)
# for dest in dest_email:
# send_email(dest, title, body)
# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
# Dizionario dati con le chiavi richieste dai DAG di AirFlow
my_email = 'abc@xyz.com'
default_args = {
"owner": 'SG',
"depends_on_past": False,
"start_date": dates.days_ago(1),
"end_date": None,
"email_on_failure": 'my_email',
"email_on_retry": False,
"email": [my_email],
"retries": 2,
"retry_delay": timedelta(minutes=5),
"max_active_runs": 1,
"on_failure_callback": partial(notify_email, config={'email_address': my_email})
}
dag_name = 'SG-DagDemo-Once'
with DAG(dag_id=dag_name, default_args=default_args, schedule_interval="@once") as ldag:
project = Variable.get("PROJECT")
source_bucket = 'sg-dev'
source_object = 'covid19_italy/national_trends_2.csv'
bq_dataset = "covid19_italy"
bq_table_name = "national_trends"
bq_task_id = f'gcs_to_bq_load_{bq_table_name}'
schema_fields = load_json_file(f"{ROOT_PATH}/source/{bq_dataset}/{bq_table_name}_tabschema.json")
t = GoogleCloudStorageToBigQueryOperator(
dag=ldag,
task_id=bq_task_id,
bucket=source_bucket,
source_objects=[source_object],
destination_project_dataset_table="{0}.{1}.{2}".format(project, bq_dataset, bq_table_name),
schema_fields=schema_fields,
source_format='CSV',
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE"
)
要在失败时调用 notify_email()
,只需将 default_args
调整为:
"on_failure_callback": notify_email
那么default_args
应该包含在DAG创建语句中:
with DAG(dag_id='SG-DagDemo-Once', default_args=default_args) as dag:
您可以尝试类似下面的方法来调用函数 notify_email()
运算符失败;每个运算符将调用相同的函数(示例取自 gcs_to_bq):
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(1),
'on_failure_callback': notify_email
}
dag_name = 'SG-DagDemo-Once'
with DAG(dag_id=dag_name, default_args=args, schedule_interval=None) as dag:
create_test_dataset = bash_operator.BashOperator(
task_id='create_airflow_test_dataset',
bash_command='bq mk airflow_test')
# [START howto_operator_gcs_to_bq]
load_csv = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq_example',
bucket='cloud-samples-data',
source_objects=['bigquery/us-states/us-states.csv'],
destination_project_dataset_table='airflow_test.gcs_to_bq_table',
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE')
# [END howto_operator_gcs_to_bq]
delete_test_dataset = bash_operator.BashOperator(
task_id='delete_airflow_test_dataset',
bash_command='bq rm -r -f -d airflow_test')
create_test_dataset >> load_csv >> delete_test_dataset
您可以通过更改每个操作员的配置来模拟错误。您需要在 notify_email()
.