如何在 Airflow 中使用 EmailOperator 发送多封电子邮件

How to send multiple emails using EmailOperator in Airflow

我正在尝试在 python 循环中使用 EmailOperator 将单独的电子邮件发送到用户列表,但目前没有发送电子邮件。 dag 没有 return 任何让我相信 EmailOperator 没有被调用的错误。

代码

@dag(schedule_interval=None, tags=['Send multiple emails testing'], default_args=default_args)
def multiple_email_send_test_dag():

    @task   
    def multiple_email_send_test():
        email_list = ['user1@gmail.com', 'user2@gmail.com']
        for i in range(len(email_list)):
            EmailOperator(
                task_id=f'send_success_email_test_no_{i}', 
                to=str(email_list[i]),
                subject='Email Header',
                html_content= f"""
                        Hi {email_list[i]}, <br>
                        <p>This is the body of the email</p>
                        <br> Thank You. <br>
                    """
            ) 
    
    # Dummy Operators
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')

    # The pipeline
    start >> multiple_email_send_test() >> end

dag = multiple_email_send_test_dag()

我错过了什么?

这是在运算符内部调用运算符的案例。 EmailOperator 没有执行——它所做的只是初始化 class 的构造函数。实际发送邮件的逻辑在class.

execute()函数中

不要这样做 - 不良做法

首先让我澄清一下,如果你想让你当前的代码工作,你必须调用 execute() 有关它的更多信息,请参阅 this answer

@task   
def multiple_email_send_test():
    email_list = ['user1@gmail.com', 'user2@gmail.com']
    for i in range(len(email_list)):
        e = EmailOperator(
            task_id=f'send_success_email_test_no_{i}', 
            to=str(email_list[i]),
            subject='Email Header',
            html_content= f"""
                    Hi {email_list[i]}, <br>
                    <p>This is the body of the email</p>
                    <br> Thank You. <br>
                """
        ) 
        e.exectue(dict())

你应该怎么解决:

选项 1: 由于 EmailOperator 向所有地址发送 1 封电子邮件,这与您希望的行为不同。您可以创建一个自定义运算符 IndvidualEmailOpeator 来接受电子邮件列表并向每个地址发送单独的邮件。通过这样做,您不需要使用 PythonOperator / 任务装饰器包装运算符,因此您可以直接使用您创建的运算符。

选项 2:

请注意,EmailOperator 只是调用 send_email,因此您可以直接使用该函数而无需运算符,从而避免运算符内部运算符的问题:

@task   
def multiple_email_send_test():
    from airflow.utils.email import send_email
    email_list = ['user1@gmail.com', 'user2@gmail.com']
    for i in range(len(email_list)):
        send_email(
            to=str(email_list[i]),
            subject='Email Header',
            html_content= f"""
                    Hi {email_list[i]}, <br>
                    <p>This is the body of the email</p>
                    <br> Thank You. <br>
                """
        )