气流:将 {{ ds }} 作为参数传递给 PostgresOperator
Airflow: pass {{ ds }} as param to PostgresOperator
我想将执行日期用作我的 sql 文件的参数:
我试过了
dt = '{{ ds }}'
s3_to_redshift = PostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
params={'file': dt},
dag=dag
)
但是没用。
dt = '{{ ds }}'
不起作用,因为 Jinja(气流中使用的模板引擎)不处理整个 Dag 定义文件。
对于每个 Operator
都有 Jinja 将处理的字段,它们是运算符本身定义的一部分。
在这种情况下,如果您像这样扩展 PostgresOperator
,则可以使 params
字段(实际上称为 parameters
,请务必更改它)模板化:
class MyPostgresOperator(PostgresOperator):
template_fields = ('sql','parameters')
现在你应该可以做到:
s3_to_redshift = MyPostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
parameters={'file': '{{ ds }}'},
dag=dag
)
PostgresOperator / JDBCOperator 继承自 BaseOperator。
BaseOperator 的输入参数之一是 params:
self.params = params or {} # Available in templates!
因此,您应该能够在不创建新文件的情况下使用它 class:
(即使参数未包含在 template_fields 中)
t1 = JdbcOperator(
task_id='copy',
sql='copy.sql',
jdbc_conn_id='connection_name',
params={'schema_name':'public'},
dag=dag
)
SQL 语句 (copy.sql) 可能如下所示:
copy {{ params.schema_name }}.table_name
from 's3://.../table_name.csv'
iam_role 'arn:aws:iam::<acc_num>:role/<role_name>'
csv
IGNOREHEADER 1
注:
copy.sql 与 DAG 所在的位置相同。
或者
您可以在 "default_args"
中定义 "template_searchpath" 变量,并指定模板文件所在文件夹的绝对路径。
例如:'template_searchpath': '/home/user/airflow/templates/'
我想将执行日期用作我的 sql 文件的参数:
我试过了
dt = '{{ ds }}'
s3_to_redshift = PostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
params={'file': dt},
dag=dag
)
但是没用。
dt = '{{ ds }}'
不起作用,因为 Jinja(气流中使用的模板引擎)不处理整个 Dag 定义文件。
对于每个 Operator
都有 Jinja 将处理的字段,它们是运算符本身定义的一部分。
在这种情况下,如果您像这样扩展 PostgresOperator
,则可以使 params
字段(实际上称为 parameters
,请务必更改它)模板化:
class MyPostgresOperator(PostgresOperator):
template_fields = ('sql','parameters')
现在你应该可以做到:
s3_to_redshift = MyPostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
parameters={'file': '{{ ds }}'},
dag=dag
)
PostgresOperator / JDBCOperator 继承自 BaseOperator。
BaseOperator 的输入参数之一是 params:
self.params = params or {} # Available in templates!
因此,您应该能够在不创建新文件的情况下使用它 class:
(即使参数未包含在 template_fields 中)
t1 = JdbcOperator(
task_id='copy',
sql='copy.sql',
jdbc_conn_id='connection_name',
params={'schema_name':'public'},
dag=dag
)
SQL 语句 (copy.sql) 可能如下所示:
copy {{ params.schema_name }}.table_name
from 's3://.../table_name.csv'
iam_role 'arn:aws:iam::<acc_num>:role/<role_name>'
csv
IGNOREHEADER 1
注:
copy.sql 与 DAG 所在的位置相同。
或者
您可以在 "default_args"
中定义 "template_searchpath" 变量,并指定模板文件所在文件夹的绝对路径。
例如:'template_searchpath': '/home/user/airflow/templates/'