在 pyspark 代码中实现 SAS 的保留功能
Implementing retain functionality of SAS in pyspark code
我正在尝试将一段包含保留功能和 SAS 中的多个 if-else 语句的代码转换为 pyspark。我在尝试搜索类似答案时没有运气。
输入数据集:
Prod_Code
评分
排名
ADAMAJ091
1234.0091
1
ADAMAJ091
1222.0001
2
ADAMAJ091
1222.0000
3
BASSDE012
5221.0123
1
BASSDE012
5111.0022
2
BASSDE012
5110.0000
3
我使用 df.withColumn("rank", row_number().over(window.partitionBy('Prod_code'))).orderBy('Rate')
函数
计算了排名
Rate
列中的值必须复制到包含 1 到 N
等级的分区中的所有其他值
预期输出数据集:
Prod_Code
评分
排名
ADAMAJ091
1234.0091
1
ADAMAJ091
1234.0091
2
ADAMAJ091
1234.0091
3
BASSDE012
5221.0123
1
BASSDE012
5221.0123
2
BASSDE012
5221.0123
3
Rate
位于 rank=1 的列的值必须复制到同一分区中的所有其他行。这是保留功能,我需要帮助在 Pyspark 代码中复制相同的功能。
我尝试对单个行使用 df.withColumn()
方法,但我无法在 pyspark 中实现此功能。
既然你已经有了 Rank 列,那么你可以使用 first
函数来获取第一个 Rate 值window 按 排名 .
排序
from pyspark.sql.functions import first
from pyspark.sql.window import Window
df = df.withColumn('Rate', first('Rate').over(Window.partitionBy('prod_code').orderBy('rank')))
df.show()
# +---------+---------+----+
# |Prod_Code| Rate|Rank|
# +---------+---------+----+
# |BASSDE012|5221.0123| 1|
# |BASSDE012|5221.0123| 2|
# |BASSDE012|5221.0123| 3|
# |ADAMAJ091|1234.0091| 1|
# |ADAMAJ091|1234.0091| 2|
# |ADAMAJ091|1234.0091| 3|
# +---------+---------+----+
我正在尝试将一段包含保留功能和 SAS 中的多个 if-else 语句的代码转换为 pyspark。我在尝试搜索类似答案时没有运气。
输入数据集:
Prod_Code | 评分 | 排名 |
---|---|---|
ADAMAJ091 | 1234.0091 | 1 |
ADAMAJ091 | 1222.0001 | 2 |
ADAMAJ091 | 1222.0000 | 3 |
BASSDE012 | 5221.0123 | 1 |
BASSDE012 | 5111.0022 | 2 |
BASSDE012 | 5110.0000 | 3 |
我使用 df.withColumn("rank", row_number().over(window.partitionBy('Prod_code'))).orderBy('Rate')
函数
计算了排名
Rate
列中的值必须复制到包含 1 到 N
预期输出数据集:
Prod_Code | 评分 | 排名 |
---|---|---|
ADAMAJ091 | 1234.0091 | 1 |
ADAMAJ091 | 1234.0091 | 2 |
ADAMAJ091 | 1234.0091 | 3 |
BASSDE012 | 5221.0123 | 1 |
BASSDE012 | 5221.0123 | 2 |
BASSDE012 | 5221.0123 | 3 |
Rate
位于 rank=1 的列的值必须复制到同一分区中的所有其他行。这是保留功能,我需要帮助在 Pyspark 代码中复制相同的功能。
我尝试对单个行使用 df.withColumn()
方法,但我无法在 pyspark 中实现此功能。
既然你已经有了 Rank 列,那么你可以使用 first
函数来获取第一个 Rate 值window 按 排名 .
from pyspark.sql.functions import first
from pyspark.sql.window import Window
df = df.withColumn('Rate', first('Rate').over(Window.partitionBy('prod_code').orderBy('rank')))
df.show()
# +---------+---------+----+
# |Prod_Code| Rate|Rank|
# +---------+---------+----+
# |BASSDE012|5221.0123| 1|
# |BASSDE012|5221.0123| 2|
# |BASSDE012|5221.0123| 3|
# |ADAMAJ091|1234.0091| 1|
# |ADAMAJ091|1234.0091| 2|
# |ADAMAJ091|1234.0091| 3|
# +---------+---------+----+