Apache Airflow 2.0.0.b2 - 动态 EmailOperator [文件] 属性

Apache Airflow 2.0.0.b2 - Dynamic EmailOperator [files] property

TL;DR 如何创建动态 EmailOperator 以从 XCom 属性

的文件路径发送文件

大家好,

我正在使用 Apache Airflow 2.0.0.b2。我的问题是我的 DAG 创建了一个名称在运行时更改的文件。我想通过电子邮件发送此文件,但在将动态文件名输入我的 EmailOperator 时遇到问题。

我尝试过但失败的事情!:

  1. files 使用模板 属性。

    files=["{{ ti.xcom_pull(key='OUTPUT_CSV') }}"],
    

    不幸的是,只有当运算符中的字段被标记为模板时,模板才有效。 files 不是 EmailOperator

    上的模板化字段
  2. 使用函数动态创建我的任务

     def get_email_operator(?...):
        export_file_path = ti.xcom_pull(key='OUTPUT_CSV')
        email_subject = 'Some Subject'
        return EmailOperator(
            task_id="get_email_operator",
            to=['someemail@somedomain.net'],
            subject=email_subject,
            files=[export_file_path,],
            html_content='<br>',
            dag=current_dag)
    
    ..task3 >> get_email_operator() >> task4
    

    不幸的是,我似乎无法弄清楚如何将当前 **kwargsti 信息传递到我的函数调用中以获取当前文件路径。

编辑: Elad 下面的回答让我朝着正确的方向前进。我唯一要做的就是在调用 op.execute()

时添加 kwargs

解法:

def get_email_operator(**kwargs):
    export_file_path = kwargs['ti'].xcom_pull(key='OUTPUT_CSV')
    email_subject = 'Termed Drivers - ' + date_string
    op = EmailOperator(
        task_id="get_email_operator",
        to=['someemail@somedomain.net'],
        subject=email_subject,
        files=[export_file_path,],
        html_content='<br>')
    op.execute(kwargs)

文件将在 Airflow 2 中模板化,因为 PR 上周已合并。

但是您无需等待,您可以使用您自己的自定义运算符包装当前运算符,指定模板化字段列表。

喜欢:

class MyEmailOperator(EmailOperator):
     template_fields = ('to', 'subject', 'html_content', 'files')

然后您可以在代码中使用 MyEmailOperatorfiles 将被模板化。

您也可以使用包装 EmailOperator 的 PythonOperator 来解决这个问题:

def get_email_operator(**context):
    xcom = context['ti'].xcom_pull(task_ids='OUTPUT_CSV')
    email_subject = 'Some Subject'
    op = EmailOperator(
        task_id="get_email_operator",
        to=['someemail@somedomain.net'],
        subject=email_subject,
        files=[xcom,],
        html_content='<br>')
    op.execute(context)

python = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=get_email_operator,
    provide_context=True
)

..task3 >> python >> task4