PySpark - 添加一个按用户排名的新列
PySpark - Add a new column with a Rank by User
我有这个 PySpark DataFrame
df = pd.DataFrame(np.array([
["aa@gmail.com",2,3], ["aa@gmail.com",5,5],
["bb@gmail.com",8,2], ["cc@gmail.com",9,3]
]), columns=['user','movie','rating'])
sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1)
user movie rating
aa@gmail.com 2 3
aa@gmail.com 5 5
bb@gmail.com 8 2
cc@gmail.com 9 3
我需要添加一个具有用户排名的新列
我想要这个输出
user movie rating Rank
aa@gmail.com 2 3 1
aa@gmail.com 5 5 1
bb@gmail.com 8 2 2
cc@gmail.com 9 3 3
我该怎么做?
目前确实没有优雅的解决方案。如果必须的话,你可以尝试这样的事情:
lookup = (sparkdf.select("user")
.distinct()
.orderBy("user")
.rdd
.zipWithIndex()
.map(lambda x: x[0] + (x[1], ))
.toDF(["user", "rank"]))
sparkdf.join(lookup, ["user"]).withColumn("rank", col("rank") + 1)
Window 函数替代更简洁:
from pyspark.sql.functions import dense_rank
sparkdf.withColumn("rank", dense_rank().over(w))
但它非常低效,在实践中应该避免。
我有这个 PySpark DataFrame
df = pd.DataFrame(np.array([
["aa@gmail.com",2,3], ["aa@gmail.com",5,5],
["bb@gmail.com",8,2], ["cc@gmail.com",9,3]
]), columns=['user','movie','rating'])
sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1)
user movie rating
aa@gmail.com 2 3
aa@gmail.com 5 5
bb@gmail.com 8 2
cc@gmail.com 9 3
我需要添加一个具有用户排名的新列
我想要这个输出
user movie rating Rank
aa@gmail.com 2 3 1
aa@gmail.com 5 5 1
bb@gmail.com 8 2 2
cc@gmail.com 9 3 3
我该怎么做?
目前确实没有优雅的解决方案。如果必须的话,你可以尝试这样的事情:
lookup = (sparkdf.select("user")
.distinct()
.orderBy("user")
.rdd
.zipWithIndex()
.map(lambda x: x[0] + (x[1], ))
.toDF(["user", "rank"]))
sparkdf.join(lookup, ["user"]).withColumn("rank", col("rank") + 1)
Window 函数替代更简洁:
from pyspark.sql.functions import dense_rank
sparkdf.withColumn("rank", dense_rank().over(w))
但它非常低效,在实践中应该避免。