如何将参数传递给 Airflow on_success_callback 和 on_failure_callback
How to pass parameters to Airflow on_success_callback and on_failure_callback
我已经使用 on_success_callback 和 on_failure_callback 实施了关于成功和失败的电子邮件警报。
a context dictionary is passed as a single parameter to this function.
如何将另一个参数传递给这些回调方法?
这是我的代码
from airflow.utils.email import send_email_smtp
def task_success_alert(context):
subject = "[Airflow] DAG {0} - Task {1}: Success".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1]
)
html_content = """
DAG: {0}<br>
Task: {1}<br>
Succeeded on: {2}
""".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1],
datetime.now()
)
send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)
def task_failure_alert(context):
subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1]
)
html_content = """
DAG: {0}<br>
Task: {1}<br>
Failed on: {2}
""".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1],
datetime.now()
)
send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 13),
'on_success_callback': task_success_alert,
'on_failure_callback': task_failure_alert
}
我打算将回调移动到另一个包并将电子邮件地址作为参数传递。
你可以在你的 dag 中定义一个函数来调用你的包中的函数。在调用该函数时,将电子邮件作为参数传递。您可以在 DAG 级别进一步优化它以仅传递电子邮件所需的信息。
from package import outer_task_success_callback
email = 'xyz@example.com'
def task_success_alert(context):
dag_id = context['dag'].dag_id
task_id = context['task_instance']. task_id
outer_task_success_callback(dag_id, task_id, email)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 13),
'on_success_callback': task_success_alert,
'on_failure_callback': task_failure_alert
}
这将允许您在调用程序包中的函数之前进行自定义。
附带说明一下,airflow 具有 smtp 电子邮件功能。您可以利用这些解决方案,而不是编写自己的解决方案。
您可以创建一个任务,其唯一目的是通过 xcoms
推送配置设置。您可以通过 context
提取配置,因为 task_instance
对象包含在 context
.
中
def push_configuration(ti, params):
ti.xcom_push(key='conn_id', value=params)
def task_success_alert(context):
ti = context.get('ti')
params = ti.xcom_pull(key='params', task_ids='Settings')
...
step0 = PythonOperator(
task_id='Settings',
python_callable=push_configuration,
op_kwargs={'params': params})
step1 = BashOperator(
task_id='step1',
bash_command='pwd',
on_success_callback=task_success_alert)
您可以使用 partial 创建带有预定义参数的函数,例如:
from functools import partial
new_task_success_alert = partial(task_success_alert, email='your_email')
然后添加新函数作为回调:
on_success_callback=new_task_success_alert
我已经使用 on_success_callback 和 on_failure_callback 实施了关于成功和失败的电子邮件警报。
a context dictionary is passed as a single parameter to this function.
如何将另一个参数传递给这些回调方法?
这是我的代码
from airflow.utils.email import send_email_smtp
def task_success_alert(context):
subject = "[Airflow] DAG {0} - Task {1}: Success".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1]
)
html_content = """
DAG: {0}<br>
Task: {1}<br>
Succeeded on: {2}
""".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1],
datetime.now()
)
send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)
def task_failure_alert(context):
subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1]
)
html_content = """
DAG: {0}<br>
Task: {1}<br>
Failed on: {2}
""".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1],
datetime.now()
)
send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 13),
'on_success_callback': task_success_alert,
'on_failure_callback': task_failure_alert
}
我打算将回调移动到另一个包并将电子邮件地址作为参数传递。
你可以在你的 dag 中定义一个函数来调用你的包中的函数。在调用该函数时,将电子邮件作为参数传递。您可以在 DAG 级别进一步优化它以仅传递电子邮件所需的信息。
from package import outer_task_success_callback
email = 'xyz@example.com'
def task_success_alert(context):
dag_id = context['dag'].dag_id
task_id = context['task_instance']. task_id
outer_task_success_callback(dag_id, task_id, email)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 13),
'on_success_callback': task_success_alert,
'on_failure_callback': task_failure_alert
}
这将允许您在调用程序包中的函数之前进行自定义。
附带说明一下,airflow 具有 smtp 电子邮件功能。您可以利用这些解决方案,而不是编写自己的解决方案。
您可以创建一个任务,其唯一目的是通过 xcoms
推送配置设置。您可以通过 context
提取配置,因为 task_instance
对象包含在 context
.
def push_configuration(ti, params):
ti.xcom_push(key='conn_id', value=params)
def task_success_alert(context):
ti = context.get('ti')
params = ti.xcom_pull(key='params', task_ids='Settings')
...
step0 = PythonOperator(
task_id='Settings',
python_callable=push_configuration,
op_kwargs={'params': params})
step1 = BashOperator(
task_id='step1',
bash_command='pwd',
on_success_callback=task_success_alert)
您可以使用 partial 创建带有预定义参数的函数,例如:
from functools import partial
new_task_success_alert = partial(task_success_alert, email='your_email')
然后添加新函数作为回调:
on_success_callback=new_task_success_alert