在 pyspark 代码中实现 SAS 的保留功能

Implementing retain functionality of SAS in pyspark code

我正在尝试将一段包含保留功能和 SAS 中的多个 if-else 语句的代码转换为 pyspark。我在尝试搜索类似答案时没有运气。

输入数据集:

Prod_Code 评分 排名
ADAMAJ091 1234.0091 1
ADAMAJ091 1222.0001 2
ADAMAJ091 1222.0000 3
BASSDE012 5221.0123 1
BASSDE012 5111.0022 2
BASSDE012 5110.0000 3

我使用 df.withColumn("rank", row_number().over(window.partitionBy('Prod_code'))).orderBy('Rate') 函数
计算了排名 Rate 列中的值必须复制到包含 1 到 N

等级的分区中的所有其他值

预期输出数据集:

Prod_Code 评分 排名
ADAMAJ091 1234.0091 1
ADAMAJ091 1234.0091 2
ADAMAJ091 1234.0091 3
BASSDE012 5221.0123 1
BASSDE012 5221.0123 2
BASSDE012 5221.0123 3

Rate 位于 rank=1 的列的值必须复制到同一分区中的所有其他行。这是保留功能,我需要帮助在 Pyspark 代码中复制相同的功能。

我尝试对单个行使用 df.withColumn() 方法,但我无法在 pyspark 中实现此功能。

既然你已经有了 Rank 列,那么你可以使用 first 函数来获取第一个 Rate 值window 按 排名 .

排序
from pyspark.sql.functions import first
from pyspark.sql.window import Window

df = df.withColumn('Rate', first('Rate').over(Window.partitionBy('prod_code').orderBy('rank')))
df.show()

# +---------+---------+----+
# |Prod_Code|     Rate|Rank|
# +---------+---------+----+
# |BASSDE012|5221.0123|   1|
# |BASSDE012|5221.0123|   2|
# |BASSDE012|5221.0123|   3|
# |ADAMAJ091|1234.0091|   1|
# |ADAMAJ091|1234.0091|   2|
# |ADAMAJ091|1234.0091|   3|
# +---------+---------+----+