如何在 Airflow 中使用 EmailOperator 发送多封电子邮件
How to send multiple emails using EmailOperator in Airflow
我正在尝试在 python 循环中使用 EmailOperator 将单独的电子邮件发送到用户列表,但目前没有发送电子邮件。 dag 没有 return 任何让我相信 EmailOperator 没有被调用的错误。
代码
@dag(schedule_interval=None, tags=['Send multiple emails testing'], default_args=default_args)
def multiple_email_send_test_dag():
@task
def multiple_email_send_test():
email_list = ['user1@gmail.com', 'user2@gmail.com']
for i in range(len(email_list)):
EmailOperator(
task_id=f'send_success_email_test_no_{i}',
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
"""
)
# Dummy Operators
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
# The pipeline
start >> multiple_email_send_test() >> end
dag = multiple_email_send_test_dag()
我错过了什么?
这是在运算符内部调用运算符的案例。 EmailOperator
没有执行——它所做的只是初始化 class 的构造函数。实际发送邮件的逻辑在class.
的execute()
函数中
不要这样做 - 不良做法:
首先让我澄清一下,如果你想让你当前的代码工作,你必须调用 execute()
有关它的更多信息,请参阅 this answer。
@task
def multiple_email_send_test():
email_list = ['user1@gmail.com', 'user2@gmail.com']
for i in range(len(email_list)):
e = EmailOperator(
task_id=f'send_success_email_test_no_{i}',
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
"""
)
e.exectue(dict())
你应该怎么解决:
选项 1:
由于 EmailOperator
向所有地址发送 1 封电子邮件,这与您希望的行为不同。您可以创建一个自定义运算符 IndvidualEmailOpeator
来接受电子邮件列表并向每个地址发送单独的邮件。通过这样做,您不需要使用 PythonOperator
/ 任务装饰器包装运算符,因此您可以直接使用您创建的运算符。
选项 2:
请注意,EmailOperator
只是调用 send_email,因此您可以直接使用该函数而无需运算符,从而避免运算符内部运算符的问题:
@task
def multiple_email_send_test():
from airflow.utils.email import send_email
email_list = ['user1@gmail.com', 'user2@gmail.com']
for i in range(len(email_list)):
send_email(
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
"""
)
我正在尝试在 python 循环中使用 EmailOperator 将单独的电子邮件发送到用户列表,但目前没有发送电子邮件。 dag 没有 return 任何让我相信 EmailOperator 没有被调用的错误。
代码
@dag(schedule_interval=None, tags=['Send multiple emails testing'], default_args=default_args)
def multiple_email_send_test_dag():
@task
def multiple_email_send_test():
email_list = ['user1@gmail.com', 'user2@gmail.com']
for i in range(len(email_list)):
EmailOperator(
task_id=f'send_success_email_test_no_{i}',
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
"""
)
# Dummy Operators
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
# The pipeline
start >> multiple_email_send_test() >> end
dag = multiple_email_send_test_dag()
我错过了什么?
这是在运算符内部调用运算符的案例。 EmailOperator
没有执行——它所做的只是初始化 class 的构造函数。实际发送邮件的逻辑在class.
execute()
函数中
不要这样做 - 不良做法:
首先让我澄清一下,如果你想让你当前的代码工作,你必须调用 execute()
有关它的更多信息,请参阅 this answer。
@task
def multiple_email_send_test():
email_list = ['user1@gmail.com', 'user2@gmail.com']
for i in range(len(email_list)):
e = EmailOperator(
task_id=f'send_success_email_test_no_{i}',
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
"""
)
e.exectue(dict())
你应该怎么解决:
选项 1:
由于 EmailOperator
向所有地址发送 1 封电子邮件,这与您希望的行为不同。您可以创建一个自定义运算符 IndvidualEmailOpeator
来接受电子邮件列表并向每个地址发送单独的邮件。通过这样做,您不需要使用 PythonOperator
/ 任务装饰器包装运算符,因此您可以直接使用您创建的运算符。
选项 2:
请注意,EmailOperator
只是调用 send_email,因此您可以直接使用该函数而无需运算符,从而避免运算符内部运算符的问题:
@task
def multiple_email_send_test():
from airflow.utils.email import send_email
email_list = ['user1@gmail.com', 'user2@gmail.com']
for i in range(len(email_list)):
send_email(
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
"""
)