如何在 PySpark 代码文件中格式化 SQL 查询
How to format SQL Queries inside PySpark codefile
我想在 PySpark 文件中格式化我现有的 SQL 查询。
这是我现有的源文件的样子:
from flow import flow
f = flow(["xxx"], ["xxxxxxxx"])
# this is a comment
f.spark.sql(
""" select dealer_number location_path_id, '2099-12-31' location_path_end_date, dealer_to_salespoint(dealer_number) sales_point_id, true can_rollup_owner, dealer_number entity, 5 as location_level, calendar_date, 'Sales' period_type, coalesce(m.model,'OTH') model_id, 'daily' as cadence, cpo_coverage_code, cpo_contract_status, 'cpo' feed_name from ( select *, row_number() over (partition by a.dealer_number, a.cpo_contract_number, a.cpo_contract_status order by a.calendar_date asc, a.filekey desc) rn from ( select `DEALER NUMBER` dealer_number, `CONTRACT STATUS` cpo_contract_status,`COVERAGE CODE` cpo_coverage_code, `CONTRACT NUMBER` cpo_contract_number, `vehicle model` cpo_vehicle_model, to_date(`CONTRACT TRANSACTION DATE`) calendar_date, filekey, * from cpo_v1 ) a ) f where f.rn = 1 """
)
这就是我想要的样子:
from flow import flow
f = flow(["xxx"],["abc"],filename=True)
f.spark.sql("""
select
dealer location_path_id,
'2099-12-31' location_path_end_date,
dealer_to_salespoint(dealer_number) sales_point_id,
true can_rollup_owner,
dealer_number entity,
5 as location_level,
calendar_date,
'Sales' period_type,
coalesce(m.model,'OTH') model_id,
'daily' as cadence,
cpo_coverage_code,
cpo_contract_status,
'cpo' feed_name
from (
select *,
row_number() over (partition by a.dealer_number, a.cpo_contract_number, a.cpo_contract_status
order by a.calendar_date asc, a.filekey desc) rn
from (
select
`DEALER NUMBER` dealer_number,
`CONTRACT STATUS` cpo_contract_status,
`COVERAGE CODE` cpo_coverage_code,
`CONTRACT NUMBER` cpo_contract_number,
`vehicle model` cpo_vehicle_model,
to_date(`CONTRACT TRANSACTION DATE`) calendar_date,
filekey,
*
from cpo_v1
) a
) f
left join (
select
model,
alternate_model_name
from models_v1
lateral view explode(nvl2(alternate_modelname, split(model_name || ',' || alternate_modelname, ","), split(model_name, ","))) as alternate_model_name
) m
on lower(split(f.cpo_vehicle_model,' ')[0]) = lower(m.alternate_model_name)
where
f.rn = 1
"""
).createOrReplaceTempView("xxx")
f.save_view("xxx")
我已经尝试使用黑色和其他 vscode 扩展来格式化我的代码库,但没有成功,因为 SQL 代码被视为 python 字符串。请提出任何解决方法
P.S.: 我现有的代码库包含超过 700 个这样的文件。
一种可能的选择是使用 sql-formatter
。
假设我们有一个 test.py
文件:
from flow import flow
f = flow(["xxx"], ["xxxxxxxx"])
f.spark.sql(
""" select dealer_number location_path_id, '2099-12-31' location_path_end_date, dealer_to_salespoint(dealer_number) sales_point_id, true can_rollup_owner, dealer_number entity, 5 as location_level, calendar_date, 'Sales' period_type, coalesce(m.model,'OTH') model_id, 'daily' as cadence, cpo_coverage_code, cpo_contract_status, 'cpo' feed_name from ( select *, row_number() over (partition by a.dealer_number, a.cpo_contract_number, a.cpo_contract_status order by a.calendar_date asc, a.filekey desc) rn from ( select `DEALER NUMBER` dealer_number, `CONTRACT STATUS` cpo_contract_status,`COVERAGE CODE` cpo_coverage_code, `CONTRACT NUMBER` cpo_contract_number, `vehicle model` cpo_vehicle_model, to_date(`CONTRACT TRANSACTION DATE`) calendar_date, filekey, * from cpo_v1 ) a ) f where f.rn = 1 """
)
我们可以创建一个脚本,将文件读取为字符串,通过搜索 """
查找查询,提取它们,运行 通过格式化程序替换它们:
import re
from sql_formatter.core import format_sql
with open("test.py", "r") as f_in:
text = f_in.read()
text = re.sub('"""(.*)"""', lambda x: format_sql(x.group()), text)
with open("test.py", "w") as f_out:
f_out.write(text)
我想在 PySpark 文件中格式化我现有的 SQL 查询。
这是我现有的源文件的样子:
from flow import flow
f = flow(["xxx"], ["xxxxxxxx"])
# this is a comment
f.spark.sql(
""" select dealer_number location_path_id, '2099-12-31' location_path_end_date, dealer_to_salespoint(dealer_number) sales_point_id, true can_rollup_owner, dealer_number entity, 5 as location_level, calendar_date, 'Sales' period_type, coalesce(m.model,'OTH') model_id, 'daily' as cadence, cpo_coverage_code, cpo_contract_status, 'cpo' feed_name from ( select *, row_number() over (partition by a.dealer_number, a.cpo_contract_number, a.cpo_contract_status order by a.calendar_date asc, a.filekey desc) rn from ( select `DEALER NUMBER` dealer_number, `CONTRACT STATUS` cpo_contract_status,`COVERAGE CODE` cpo_coverage_code, `CONTRACT NUMBER` cpo_contract_number, `vehicle model` cpo_vehicle_model, to_date(`CONTRACT TRANSACTION DATE`) calendar_date, filekey, * from cpo_v1 ) a ) f where f.rn = 1 """
)
这就是我想要的样子:
from flow import flow
f = flow(["xxx"],["abc"],filename=True)
f.spark.sql("""
select
dealer location_path_id,
'2099-12-31' location_path_end_date,
dealer_to_salespoint(dealer_number) sales_point_id,
true can_rollup_owner,
dealer_number entity,
5 as location_level,
calendar_date,
'Sales' period_type,
coalesce(m.model,'OTH') model_id,
'daily' as cadence,
cpo_coverage_code,
cpo_contract_status,
'cpo' feed_name
from (
select *,
row_number() over (partition by a.dealer_number, a.cpo_contract_number, a.cpo_contract_status
order by a.calendar_date asc, a.filekey desc) rn
from (
select
`DEALER NUMBER` dealer_number,
`CONTRACT STATUS` cpo_contract_status,
`COVERAGE CODE` cpo_coverage_code,
`CONTRACT NUMBER` cpo_contract_number,
`vehicle model` cpo_vehicle_model,
to_date(`CONTRACT TRANSACTION DATE`) calendar_date,
filekey,
*
from cpo_v1
) a
) f
left join (
select
model,
alternate_model_name
from models_v1
lateral view explode(nvl2(alternate_modelname, split(model_name || ',' || alternate_modelname, ","), split(model_name, ","))) as alternate_model_name
) m
on lower(split(f.cpo_vehicle_model,' ')[0]) = lower(m.alternate_model_name)
where
f.rn = 1
"""
).createOrReplaceTempView("xxx")
f.save_view("xxx")
我已经尝试使用黑色和其他 vscode 扩展来格式化我的代码库,但没有成功,因为 SQL 代码被视为 python 字符串。请提出任何解决方法
P.S.: 我现有的代码库包含超过 700 个这样的文件。
一种可能的选择是使用 sql-formatter
。
假设我们有一个 test.py
文件:
from flow import flow
f = flow(["xxx"], ["xxxxxxxx"])
f.spark.sql(
""" select dealer_number location_path_id, '2099-12-31' location_path_end_date, dealer_to_salespoint(dealer_number) sales_point_id, true can_rollup_owner, dealer_number entity, 5 as location_level, calendar_date, 'Sales' period_type, coalesce(m.model,'OTH') model_id, 'daily' as cadence, cpo_coverage_code, cpo_contract_status, 'cpo' feed_name from ( select *, row_number() over (partition by a.dealer_number, a.cpo_contract_number, a.cpo_contract_status order by a.calendar_date asc, a.filekey desc) rn from ( select `DEALER NUMBER` dealer_number, `CONTRACT STATUS` cpo_contract_status,`COVERAGE CODE` cpo_coverage_code, `CONTRACT NUMBER` cpo_contract_number, `vehicle model` cpo_vehicle_model, to_date(`CONTRACT TRANSACTION DATE`) calendar_date, filekey, * from cpo_v1 ) a ) f where f.rn = 1 """
)
我们可以创建一个脚本,将文件读取为字符串,通过搜索 """
查找查询,提取它们,运行 通过格式化程序替换它们:
import re
from sql_formatter.core import format_sql
with open("test.py", "r") as f_in:
text = f_in.read()
text = re.sub('"""(.*)"""', lambda x: format_sql(x.group()), text)
with open("test.py", "w") as f_out:
f_out.write(text)