如何将 config.py 中的 Python 参数传递给 .sql 文件?

How can I pass a Python parameter in config.py to .sql file?

我正在使用 Python Snowflake 连接器从 Snowflake 中的表中提取数据。这是我的文件结构:

sql
   a.sql
   b.sql
   c.sql
configurations.py
data_extract.py
main.py

这里的 sql 文件夹包含我在 .sql 文件中的所有 sql 查询。我将这些 sql 文件分开放置,因为它们每行都是几行,如果我将它们放入 python 文件中看起来很乱。 configuration.py 包含我想在每次 运行 代码时更改的日期时间参数。它看起来像这样:

START_TIME = '2018-10-01 00:00:00'
END_TIME = '2019-04-01 00:00:00'

我想将这些参数添加到 .sql 文件中。例如a.sql包含以下内容:

DECLARE
  @START_PICKUP_DATE DATE,
  @END_PICKUP_DATE DATE,

SET
  @START_PICKUP_DATE = '2018-10-01'

SET
  @END_PICKUP_DATE = '2019-04-01'

select supplier_confirmation_id, pickup_datetime, dropoff_datetime, pickup_station_distance
from SANDBOX.ZQIAN.V_PDL
where pickup_datetime >= START_PICKUP_DATE and pickup_datetime < END_PICKUP_DATE
      and supplier_confirmation_id is not null;

我在 python 代码中按以下方式使用 a.sql

def executeSQLScriptsFromFile(filepath):
    # snowflake credentials, replace SECRET with your own
    ctx = snowflake.connector.connect(
        user='S_ANALYTICS_USER',
        account=SECRET_A,
        region='us-east-1',
        warehouse=SECRET_B,
        database=SECRET_C,
        role=SECRET_D,
        password=SECRET_E)

    fd = open(filepath, 'r')
    query = fd.read()
    fd.close()

    cs = ctx.cursor()
    try:
        cur = cs.execute(query)
        df = pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description])
    finally:
        cs.close()
    ctx.close()

    return df

def extract_data():
    a_sqlpath = os.path.join(os.getcwd(), 'sql\a.sql')
    a_df = executeSQLScriptsFromFile(a_sqlpath)
    return a_df

问题是我希望 a.sql 文件中的 START_PICKUP_DATE 和 END_PICKUP_DATE 同步并等于 START_TIME 和 END_TIME在 configurations.py 文件中,这样我只需要更改 configurations.py 中的 START_TIME 和 END_TIME 并使用 Snowflake 中的 a.sql 在不同的时间范围内提取数据。

我已经在网上寻找解决方案很长时间了,但仍然找不到适合我的问题的好的解决方案。非常感谢任何可以提供提示的人!

为此,我将使用您的 .sql 文件并将查询提取到带有变量格式说明符的三引号 python 字符串中。然后像导入配置一样将查询导入主脚本:

sql_queries.py:

sql_a = """
DECLARE
  @START_PICKUP_DATE DATE,
  @END_PICKUP_DATE DATE,

SET
  @START_PICKUP_DATE = {START_TIME}

SET
  @END_PICKUP_DATE = {END_TIME}

select supplier_confirmation_id, pickup_datetime, dropoff_datetime, pickup_station_distance
from SANDBOX.ZQIAN.V_PDL
where pickup_datetime >= START_PICKUP_DATE and pickup_datetime < END_PICKUP_DATE
  and supplier_confirmation_id is not null;
"""

main:
from sql_queries import sql_a

print(sql_a.format(configuration.START_TIME, configuration.END_TIME))

您应该能够对 sql 语句进行参数化,这样您无需在 SQL 文件中声明,而只需将其作为执行期间传递的参数即可。

select supplier_confirmation_id, pickup_datetime, dropoff_datetime, pickup_station_distance
from SANDBOX.ZQIAN.V_PDL
where pickup_datetime >= %(START_PICKUP_DATE)s and pickup_datetime < %(END_PICKUP_DATE)s and supplier_confirmation_id is not null;

那么在调用函数的时候,只要将参数START_PICKUP_DATEEND_PICKUP_DATE作为参数传递给execute语句即可。一种方法是执行从参数名称到参数值的映射。 (在这个例子中,我假设你有一个函数可以获取参数值)。

cur = cs.execute(query, {'START_PICKUP_DATE':get_value_from_config('start_pickup'), 'END_PICKUP_DATE':get_value_from_config('end_pickup')})

或者您可以按位置传递它们

cur = cs.execute(query, [get_value_from_config('start_pickup'), get_value_from_config('end_pickup')])

本质上变成了

cur = cs.execute(query, ['2018-10-01 00:00:00','2019-04-01 00:00:00'])