参数中的 Airflow Jinja 模板
Airflow Jinja Templating in params
我有一个 Airflow 运算符,它允许我查询接受 Jinja 模板文件作为查询输入的 Athena。通常,我将 table/database 名称等变量传递给创建 table 的模板并添加分区语句。这适用于定义的字符串。
我的任务定义如下所示:
db = 'sample_db'
table = 'sample_table'
out = 's3://sample'
p1='2020'
p2='1'
add_partition_task= AWSAthenaOperator(
task_id='add_partition,
query='add_partition.sql',
params={'database': db,
'table_name': table,
'p1': p1
'p2': p2},
database=db,
output_location=out
)
正在模板化的 SQL 文件如下所示:
ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (partition1= '{{ params.p1 }}', partition2 = '{{ params.p2 }}')
这个模板工作正常。
对此的扩展是允许 'partition1' 和 'partition2' 由包含 XCOM 拉取的 jinja 模板变量定义,该变量来自将日期转换为财政年度和期间的早期任务。可以使用日期作为分区,但我对是否可以强制参数接受 Jinja 模板感兴趣。
我想使用的代码如下所示:
db = 'sample_db'
table = 'sample_table'
out = 's3://sample'
p1 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p1") }}'
p2 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p2") }}'
add_partition_task= AWSAthenaOperator(
task_id='add_partition,
query='add_partition.sql',
params={'database': db,
'table_name': table,
'p1': p1
'p2': p2},
database=db,
output_location=out
)
现在 params.p1 和 params.p2 包含一个 Jinja 模板。显然,params 不支持 jinja 模板,因为呈现的 SQL 包含字符串文字 '{{ task_instance...' 而不是呈现的 XCOM 值。
在运算符实现中向 template_fields 添加参数不足以强制其呈现模板。我的操作员仅扩展 BaseOperator 并使用扩展 AwsHook 的 AthenaHook。
有没有人有在类似结构或替代方法的参数中传递模板化变量的经验?
由于 AWSAthenaOperator 将 query
作为模板字段并接受文件扩展名 .sql
,您可以在文件本身中包含 jinja 模板。
我稍微修改了您的 AWSAthenaOperator 以适合示例。
add_partition_task= AWSAthenaOperator(
task_id='add_partition,
query='add_partition.sql',
params={
'database': db,
'table_name': table,
}
)
这是 add_partition.sql
的样子。
INSERT OVERWRITE TABLE {{ params.database }}.{{ params.table_name }} (day, month, year)
SELECT * FROM db.table
WHERE p1 = "{{ task_instance.xcom_pull(task_ids='convert_to_partition', key='p1') }}"
AND p2 = "{{ task_instance.xcom_pull(task_ids='convert_to_partition', key='p2') }}"
;
我有一个 Airflow 运算符,它允许我查询接受 Jinja 模板文件作为查询输入的 Athena。通常,我将 table/database 名称等变量传递给创建 table 的模板并添加分区语句。这适用于定义的字符串。
我的任务定义如下所示:
db = 'sample_db'
table = 'sample_table'
out = 's3://sample'
p1='2020'
p2='1'
add_partition_task= AWSAthenaOperator(
task_id='add_partition,
query='add_partition.sql',
params={'database': db,
'table_name': table,
'p1': p1
'p2': p2},
database=db,
output_location=out
)
正在模板化的 SQL 文件如下所示:
ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (partition1= '{{ params.p1 }}', partition2 = '{{ params.p2 }}')
这个模板工作正常。
对此的扩展是允许 'partition1' 和 'partition2' 由包含 XCOM 拉取的 jinja 模板变量定义,该变量来自将日期转换为财政年度和期间的早期任务。可以使用日期作为分区,但我对是否可以强制参数接受 Jinja 模板感兴趣。
我想使用的代码如下所示:
db = 'sample_db'
table = 'sample_table'
out = 's3://sample'
p1 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p1") }}'
p2 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p2") }}'
add_partition_task= AWSAthenaOperator(
task_id='add_partition,
query='add_partition.sql',
params={'database': db,
'table_name': table,
'p1': p1
'p2': p2},
database=db,
output_location=out
)
现在 params.p1 和 params.p2 包含一个 Jinja 模板。显然,params 不支持 jinja 模板,因为呈现的 SQL 包含字符串文字 '{{ task_instance...' 而不是呈现的 XCOM 值。
在运算符实现中向 template_fields 添加参数不足以强制其呈现模板。我的操作员仅扩展 BaseOperator 并使用扩展 AwsHook 的 AthenaHook。 有没有人有在类似结构或替代方法的参数中传递模板化变量的经验?
由于 AWSAthenaOperator 将 query
作为模板字段并接受文件扩展名 .sql
,您可以在文件本身中包含 jinja 模板。
我稍微修改了您的 AWSAthenaOperator 以适合示例。
add_partition_task= AWSAthenaOperator(
task_id='add_partition,
query='add_partition.sql',
params={
'database': db,
'table_name': table,
}
)
这是 add_partition.sql
的样子。
INSERT OVERWRITE TABLE {{ params.database }}.{{ params.table_name }} (day, month, year)
SELECT * FROM db.table
WHERE p1 = "{{ task_instance.xcom_pull(task_ids='convert_to_partition', key='p1') }}"
AND p2 = "{{ task_instance.xcom_pull(task_ids='convert_to_partition', key='p2') }}"
;