如何在 PostGresOperator 气流中的 .sql 文件中传递参数?
How to pass paremeter in my .sql file in PostGresOperator airflow?
我有 sql 文件,我想使用 PostGresOperator 将参数传递给那个 sql 文件。
"""select * from table_{} where id > ID """.format(mytable,myID)
我的 postGresOperator
mport_redshift_table = PostgresOperator(
task_id='copy_data_from_redshift_{}'.format(country),
postgres_conn_id='postgres_default',
sql="""
select * from table_{} where id > {}
""".format(mytable,myID)
我如何做同样的事情并在我的 .sql 文件中传递我的参数并仍然使用 .format(mytable,myID) ?
这样我就可以将它们传递到我引用的 .sql 文件中。
如 PostgresOperator
的 How-to Guide 中所述,您可以将 SQL 放在 dag 目录的子文件夹内的文件中:
-- dags/sql/birth_date.sql
SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
使用 params
传递将在文件的 SQL 中呈现的 key/value 对:
get_birth_date = PostgresOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
sql="sql/birth_date.sql",
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
)
编辑
如果您想内联,不使用文件,只需使用任何字符串插值机制:
sql="""select * from table_{} where id > {} """.format(mytable,myID)
或
sql=f"""select * from table_{table_name} where id > {myID} """
或者如果您想使用 jinja,利用任何 default vairables,例如触发 DAG 时提供的参数,您可以这样做:
sql=f"""select * from table_{{ params.param1 }} where id > {myID} """
我有 sql 文件,我想使用 PostGresOperator 将参数传递给那个 sql 文件。
"""select * from table_{} where id > ID """.format(mytable,myID)
我的 postGresOperator
mport_redshift_table = PostgresOperator(
task_id='copy_data_from_redshift_{}'.format(country),
postgres_conn_id='postgres_default',
sql="""
select * from table_{} where id > {}
""".format(mytable,myID)
我如何做同样的事情并在我的 .sql 文件中传递我的参数并仍然使用 .format(mytable,myID) ?
这样我就可以将它们传递到我引用的 .sql 文件中。
如 PostgresOperator
的 How-to Guide 中所述,您可以将 SQL 放在 dag 目录的子文件夹内的文件中:
-- dags/sql/birth_date.sql
SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
使用 params
传递将在文件的 SQL 中呈现的 key/value 对:
get_birth_date = PostgresOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
sql="sql/birth_date.sql",
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
)
编辑
如果您想内联,不使用文件,只需使用任何字符串插值机制:
sql="""select * from table_{} where id > {} """.format(mytable,myID)
或
sql=f"""select * from table_{table_name} where id > {myID} """
或者如果您想使用 jinja,利用任何 default vairables,例如触发 DAG 时提供的参数,您可以这样做:
sql=f"""select * from table_{{ params.param1 }} where id > {myID} """