我们如何在 pyspark 中使用 dense_rank() 函数?

How can we use dense_rank() function in pyspark?

我是 运行 pyspark 脚本,我在其中 运行 sql 查询和创建数据框。 在 sql 查询中有 dense_rank() 函数。由于此查询需要太多时间才能完全执行。

有什么方法可以快速执行查询,或者我们可以在 pyspark 级别处理这个问题吗? pyspark 中是否有任何函数或方法可用于替换 sql 中的 dense_rank()?

SQL:

SELECT  DENSE_RANK() OVER(ORDER BY SOURCE_COLUMN_VALUE) AS SYSTEM_ID,SYSTEM_TABLE_NAME,SOURCE_ID,SOURCE_NAME,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME,SRC_VALUE AS SOURCE_COLUMN_VALUE,IM_INSERT_DT FROM (SELECT ID AS SOURCE_ID,'AMPIL' AS SOURCE_NAME,UPPER(concat(coalesce(addr_line_1,''),';',coalesce(addr_line_2,''),';',coalesce(city_1,''),';',coalesce(state_1,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,''))) as  SOURCE_COLUMN_VALUE,concat(coalesce(addr_line1_src,''),';',coalesce(addr_line2_src,''),';',coalesce(city_src,''),';',coalesce(state_crc,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,'')) as SRC_VALUE,SOURCE_TABLE_NAME,'ADDRESS' AS SYSTEM_TABLE_NAME,SOURCE_COLUMN_NAME,date_format(current_timestamp(),'yyyy-MM-dd hh:mm:ss') as IM_INSERT_DT from (SELECT ID,regexp_replace(addr_line_1,' ','') as addr_line_1,Upper(addr_line_1) as addr_line1_src,regexp_replace(addr_line_2,' ','') as addr_line_2 ,upper(addr_line_2) as addr_line2_src,regexp_replace(UPPER(coalesce(city,meli_city_nm)),' ','') as city_1,UPPER(coalesce(city,meli_city_nm)) as city_src,regexp_replace(coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)),' ','') as state_1, coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)) as state_crc,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then 'USA' when UPPER(coalesce(vw_states_code.country_cd,country)) = 'CANADA' then 'CA' else regexp_replace(UPPER(coalesce(vw_states_code.country_cd,country)),' ','') end as cntry_1,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then regexp_extract(substr(trim(regexp_replace(zip,' ','')),0,5),'^[0-9]{5}$',0) else regexp_replace(zip,' ','') end as zip_1,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME from vw_addr_stg LEFT JOIN (select * from vw_dmn_meli_zip where MELI_LAST_LN = 'L') vw_dmn_meli  on vw_addr_stg.zip=vw_dmn_meli.meli_zip_cd_base LEFT JOIN vw_states_code on (coalesce(meli_stt_provncd,state) = vw_states_code.state_cd or vw_states_code.state_nm = vw_addr_stg.state) LEFT JOIN vw_country_codes on vw_country_codes.country_name = vw_addr_stg.country))

pyspark 中,您可以结合使用 Window 函数和 SQL 函数来获得您想要的结果。我不 SQL 流利,我还没有测试解决方案,但类似的东西可能对你有帮助:

import pyspark.sql.Window as psw
import pyspark.sql.functions as psf

w = psw.Window.partitionBy("SOURCE_COLUMN_VALUE")
df.withColumn("SYSTEM_ID", dense_rank().over(w))

您可以找到 dense_rank here

的文档