如何将 config.py 中的 Python 参数传递给 .sql 文件?
How can I pass a Python parameter in config.py to .sql file?
我正在使用 Python Snowflake 连接器从 Snowflake 中的表中提取数据。这是我的文件结构:
sql
a.sql
b.sql
c.sql
configurations.py
data_extract.py
main.py
这里的 sql 文件夹包含我在 .sql 文件中的所有 sql 查询。我将这些 sql 文件分开放置,因为它们每行都是几行,如果我将它们放入 python 文件中看起来很乱。
configuration.py 包含我想在每次 运行 代码时更改的日期时间参数。它看起来像这样:
START_TIME = '2018-10-01 00:00:00'
END_TIME = '2019-04-01 00:00:00'
我想将这些参数添加到 .sql 文件中。例如a.sql包含以下内容:
DECLARE
@START_PICKUP_DATE DATE,
@END_PICKUP_DATE DATE,
SET
@START_PICKUP_DATE = '2018-10-01'
SET
@END_PICKUP_DATE = '2019-04-01'
select supplier_confirmation_id, pickup_datetime, dropoff_datetime, pickup_station_distance
from SANDBOX.ZQIAN.V_PDL
where pickup_datetime >= START_PICKUP_DATE and pickup_datetime < END_PICKUP_DATE
and supplier_confirmation_id is not null;
我在 python 代码中按以下方式使用 a.sql:
def executeSQLScriptsFromFile(filepath):
# snowflake credentials, replace SECRET with your own
ctx = snowflake.connector.connect(
user='S_ANALYTICS_USER',
account=SECRET_A,
region='us-east-1',
warehouse=SECRET_B,
database=SECRET_C,
role=SECRET_D,
password=SECRET_E)
fd = open(filepath, 'r')
query = fd.read()
fd.close()
cs = ctx.cursor()
try:
cur = cs.execute(query)
df = pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description])
finally:
cs.close()
ctx.close()
return df
def extract_data():
a_sqlpath = os.path.join(os.getcwd(), 'sql\a.sql')
a_df = executeSQLScriptsFromFile(a_sqlpath)
return a_df
问题是我希望 a.sql 文件中的 START_PICKUP_DATE 和 END_PICKUP_DATE 同步并等于 START_TIME 和 END_TIME在 configurations.py 文件中,这样我只需要更改 configurations.py 中的 START_TIME 和 END_TIME 并使用 Snowflake 中的 a.sql 在不同的时间范围内提取数据。
我已经在网上寻找解决方案很长时间了,但仍然找不到适合我的问题的好的解决方案。非常感谢任何可以提供提示的人!
为此,我将使用您的 .sql 文件并将查询提取到带有变量格式说明符的三引号 python 字符串中。然后像导入配置一样将查询导入主脚本:
sql_queries.py:
sql_a = """
DECLARE
@START_PICKUP_DATE DATE,
@END_PICKUP_DATE DATE,
SET
@START_PICKUP_DATE = {START_TIME}
SET
@END_PICKUP_DATE = {END_TIME}
select supplier_confirmation_id, pickup_datetime, dropoff_datetime, pickup_station_distance
from SANDBOX.ZQIAN.V_PDL
where pickup_datetime >= START_PICKUP_DATE and pickup_datetime < END_PICKUP_DATE
and supplier_confirmation_id is not null;
"""
main:
from sql_queries import sql_a
print(sql_a.format(configuration.START_TIME, configuration.END_TIME))
您应该能够对 sql 语句进行参数化,这样您无需在 SQL 文件中声明,而只需将其作为执行期间传递的参数即可。
select supplier_confirmation_id, pickup_datetime, dropoff_datetime, pickup_station_distance
from SANDBOX.ZQIAN.V_PDL
where pickup_datetime >= %(START_PICKUP_DATE)s and pickup_datetime < %(END_PICKUP_DATE)s and supplier_confirmation_id is not null;
那么在调用函数的时候,只要将参数START_PICKUP_DATE
和END_PICKUP_DATE
作为参数传递给execute语句即可。一种方法是执行从参数名称到参数值的映射。 (在这个例子中,我假设你有一个函数可以获取参数值)。
cur = cs.execute(query, {'START_PICKUP_DATE':get_value_from_config('start_pickup'), 'END_PICKUP_DATE':get_value_from_config('end_pickup')})
或者您可以按位置传递它们
cur = cs.execute(query, [get_value_from_config('start_pickup'), get_value_from_config('end_pickup')])
本质上变成了
cur = cs.execute(query, ['2018-10-01 00:00:00','2019-04-01 00:00:00'])
我正在使用 Python Snowflake 连接器从 Snowflake 中的表中提取数据。这是我的文件结构:
sql
a.sql
b.sql
c.sql
configurations.py
data_extract.py
main.py
这里的 sql 文件夹包含我在 .sql 文件中的所有 sql 查询。我将这些 sql 文件分开放置,因为它们每行都是几行,如果我将它们放入 python 文件中看起来很乱。 configuration.py 包含我想在每次 运行 代码时更改的日期时间参数。它看起来像这样:
START_TIME = '2018-10-01 00:00:00'
END_TIME = '2019-04-01 00:00:00'
我想将这些参数添加到 .sql 文件中。例如a.sql包含以下内容:
DECLARE
@START_PICKUP_DATE DATE,
@END_PICKUP_DATE DATE,
SET
@START_PICKUP_DATE = '2018-10-01'
SET
@END_PICKUP_DATE = '2019-04-01'
select supplier_confirmation_id, pickup_datetime, dropoff_datetime, pickup_station_distance
from SANDBOX.ZQIAN.V_PDL
where pickup_datetime >= START_PICKUP_DATE and pickup_datetime < END_PICKUP_DATE
and supplier_confirmation_id is not null;
我在 python 代码中按以下方式使用 a.sql:
def executeSQLScriptsFromFile(filepath):
# snowflake credentials, replace SECRET with your own
ctx = snowflake.connector.connect(
user='S_ANALYTICS_USER',
account=SECRET_A,
region='us-east-1',
warehouse=SECRET_B,
database=SECRET_C,
role=SECRET_D,
password=SECRET_E)
fd = open(filepath, 'r')
query = fd.read()
fd.close()
cs = ctx.cursor()
try:
cur = cs.execute(query)
df = pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description])
finally:
cs.close()
ctx.close()
return df
def extract_data():
a_sqlpath = os.path.join(os.getcwd(), 'sql\a.sql')
a_df = executeSQLScriptsFromFile(a_sqlpath)
return a_df
问题是我希望 a.sql 文件中的 START_PICKUP_DATE 和 END_PICKUP_DATE 同步并等于 START_TIME 和 END_TIME在 configurations.py 文件中,这样我只需要更改 configurations.py 中的 START_TIME 和 END_TIME 并使用 Snowflake 中的 a.sql 在不同的时间范围内提取数据。
我已经在网上寻找解决方案很长时间了,但仍然找不到适合我的问题的好的解决方案。非常感谢任何可以提供提示的人!
为此,我将使用您的 .sql 文件并将查询提取到带有变量格式说明符的三引号 python 字符串中。然后像导入配置一样将查询导入主脚本:
sql_queries.py:
sql_a = """
DECLARE
@START_PICKUP_DATE DATE,
@END_PICKUP_DATE DATE,
SET
@START_PICKUP_DATE = {START_TIME}
SET
@END_PICKUP_DATE = {END_TIME}
select supplier_confirmation_id, pickup_datetime, dropoff_datetime, pickup_station_distance
from SANDBOX.ZQIAN.V_PDL
where pickup_datetime >= START_PICKUP_DATE and pickup_datetime < END_PICKUP_DATE
and supplier_confirmation_id is not null;
"""
main:
from sql_queries import sql_a
print(sql_a.format(configuration.START_TIME, configuration.END_TIME))
您应该能够对 sql 语句进行参数化,这样您无需在 SQL 文件中声明,而只需将其作为执行期间传递的参数即可。
select supplier_confirmation_id, pickup_datetime, dropoff_datetime, pickup_station_distance
from SANDBOX.ZQIAN.V_PDL
where pickup_datetime >= %(START_PICKUP_DATE)s and pickup_datetime < %(END_PICKUP_DATE)s and supplier_confirmation_id is not null;
那么在调用函数的时候,只要将参数START_PICKUP_DATE
和END_PICKUP_DATE
作为参数传递给execute语句即可。一种方法是执行从参数名称到参数值的映射。 (在这个例子中,我假设你有一个函数可以获取参数值)。
cur = cs.execute(query, {'START_PICKUP_DATE':get_value_from_config('start_pickup'), 'END_PICKUP_DATE':get_value_from_config('end_pickup')})
或者您可以按位置传递它们
cur = cs.execute(query, [get_value_from_config('start_pickup'), get_value_from_config('end_pickup')])
本质上变成了
cur = cs.execute(query, ['2018-10-01 00:00:00','2019-04-01 00:00:00'])