Apache Airflow 2.0.0.b2 - 动态 EmailOperator [文件] 属性
Apache Airflow 2.0.0.b2 - Dynamic EmailOperator [files] property
TL;DR 如何创建动态 EmailOperator 以从 XCom 属性
的文件路径发送文件
大家好,
我正在使用 Apache Airflow 2.0.0.b2。我的问题是我的 DAG 创建了一个名称在运行时更改的文件。我想通过电子邮件发送此文件,但在将动态文件名输入我的 EmailOperator 时遇到问题。
我尝试过但失败的事情!:
为 files
使用模板 属性。
files=["{{ ti.xcom_pull(key='OUTPUT_CSV') }}"],
不幸的是,只有当运算符中的字段被标记为模板时,模板才有效。 files
不是 EmailOperator
上的模板化字段
使用函数动态创建我的任务
def get_email_operator(?...):
export_file_path = ti.xcom_pull(key='OUTPUT_CSV')
email_subject = 'Some Subject'
return EmailOperator(
task_id="get_email_operator",
to=['someemail@somedomain.net'],
subject=email_subject,
files=[export_file_path,],
html_content='<br>',
dag=current_dag)
..task3 >> get_email_operator() >> task4
不幸的是,我似乎无法弄清楚如何将当前 **kwargs
或 ti
信息传递到我的函数调用中以获取当前文件路径。
编辑: Elad 下面的回答让我朝着正确的方向前进。我唯一要做的就是在调用 op.execute()
时添加 kwargs
解法:
def get_email_operator(**kwargs):
export_file_path = kwargs['ti'].xcom_pull(key='OUTPUT_CSV')
email_subject = 'Termed Drivers - ' + date_string
op = EmailOperator(
task_id="get_email_operator",
to=['someemail@somedomain.net'],
subject=email_subject,
files=[export_file_path,],
html_content='<br>')
op.execute(kwargs)
文件将在 Airflow 2 中模板化,因为 PR 上周已合并。
但是您无需等待,您可以使用您自己的自定义运算符包装当前运算符,指定模板化字段列表。
喜欢:
class MyEmailOperator(EmailOperator):
template_fields = ('to', 'subject', 'html_content', 'files')
然后您可以在代码中使用 MyEmailOperator
。
files
将被模板化。
您也可以使用包装 EmailOperator 的 PythonOperator 来解决这个问题:
def get_email_operator(**context):
xcom = context['ti'].xcom_pull(task_ids='OUTPUT_CSV')
email_subject = 'Some Subject'
op = EmailOperator(
task_id="get_email_operator",
to=['someemail@somedomain.net'],
subject=email_subject,
files=[xcom,],
html_content='<br>')
op.execute(context)
python = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=get_email_operator,
provide_context=True
)
..task3 >> python >> task4
TL;DR 如何创建动态 EmailOperator 以从 XCom 属性
的文件路径发送文件大家好,
我正在使用 Apache Airflow 2.0.0.b2。我的问题是我的 DAG 创建了一个名称在运行时更改的文件。我想通过电子邮件发送此文件,但在将动态文件名输入我的 EmailOperator 时遇到问题。
我尝试过但失败的事情!:
为
files
使用模板 属性。files=["{{ ti.xcom_pull(key='OUTPUT_CSV') }}"],
不幸的是,只有当运算符中的字段被标记为模板时,模板才有效。
上的模板化字段files
不是 EmailOperator使用函数动态创建我的任务
def get_email_operator(?...): export_file_path = ti.xcom_pull(key='OUTPUT_CSV') email_subject = 'Some Subject' return EmailOperator( task_id="get_email_operator", to=['someemail@somedomain.net'], subject=email_subject, files=[export_file_path,], html_content='<br>', dag=current_dag) ..task3 >> get_email_operator() >> task4
不幸的是,我似乎无法弄清楚如何将当前
**kwargs
或ti
信息传递到我的函数调用中以获取当前文件路径。
编辑: Elad 下面的回答让我朝着正确的方向前进。我唯一要做的就是在调用 op.execute()
时添加kwargs
解法:
def get_email_operator(**kwargs):
export_file_path = kwargs['ti'].xcom_pull(key='OUTPUT_CSV')
email_subject = 'Termed Drivers - ' + date_string
op = EmailOperator(
task_id="get_email_operator",
to=['someemail@somedomain.net'],
subject=email_subject,
files=[export_file_path,],
html_content='<br>')
op.execute(kwargs)
文件将在 Airflow 2 中模板化,因为 PR 上周已合并。
但是您无需等待,您可以使用您自己的自定义运算符包装当前运算符,指定模板化字段列表。
喜欢:
class MyEmailOperator(EmailOperator):
template_fields = ('to', 'subject', 'html_content', 'files')
然后您可以在代码中使用 MyEmailOperator
。
files
将被模板化。
您也可以使用包装 EmailOperator 的 PythonOperator 来解决这个问题:
def get_email_operator(**context):
xcom = context['ti'].xcom_pull(task_ids='OUTPUT_CSV')
email_subject = 'Some Subject'
op = EmailOperator(
task_id="get_email_operator",
to=['someemail@somedomain.net'],
subject=email_subject,
files=[xcom,],
html_content='<br>')
op.execute(context)
python = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=get_email_operator,
provide_context=True
)
..task3 >> python >> task4