基于另一列(在本例中为用户)将列索引添加到数据框
Add column index to dataframe based on another column (user in this case)
我有一个如下所示的数据框,其中最后一列表示用户搜索位置和停留的次数
| Hanks| Rotterdam| airbnb7| 1|
|Sanders| Rotterdam| airbnb2| 1|
| Hanks| Rotterdam| airbnb2| 3|
| Hanks| Tokyo| airbnb8| 2|
| Larry| Hanoi| | 2|
| Mango| Seoul| airbnb5| 1|
| Larry| Hanoi| airbnb1| 2|
我想转换如下
| Hanks| Rotterdam| airbnb7| 1| 1|
|Sanders| Rotterdam| airbnb2| 1| 1|
| Hanks| Rotterdam| airbnb2| 3| 2|
| Hanks| Tokyo| airbnb8| 2| 3|
| Larry| Hanoi| | 2| 0|
| Mango| Seoul| airbnb5| 1| 1|
| Larry| Hanoi| airbnb1| 2| 1|
请注意,第 5 列表示用户选择的唯一选项组合(位置+住宿)的索引。
例如
| Hanks| Rotterdam| airbnb7| 1| 1|
| Hanks| Rotterdam| airbnb2| 3| 2|
| Hanks| Tokyo| airbnb8| 2| 3|
我尝试使用 groupBy/Agg 通过在 agg 函数中实现如下的 udf 函数来做到这一点。
val df2 = df1.groupBy("User", "clickedDestination", "clickedAirbnb")
.agg(indexUserDetailsUDF(col("clickedAirbnb")) as ("clickedAirbnbIndex"))
而udf如下
var cnt = 0
val airbnbClickIndex:(String) => String = (airbnb) => {
if(airbnb== "") "null" //return 0 for airbnbClickIndex when airbnb is empty
else{cnt+=1; cnt.toString()} //otherwise return incremented value
}
val indexUserDetailsUDF = udf(airbnbClickIndex)
但这不起作用。非常感谢任何输入。
谢谢。
更新 1:Daniel dense_rank 的建议对用户
执行以下操作
|Meera| Amsterdam| airbnb12| 1| 1|
|Meera| Amsterdam| airbnb2| 1| 2|
|Meera| Amsterdam| airbnb7| 1| 3|
|Meera| Amsterdam| airbnb8| 1| 4|
|Meera| Bangalore| | 1| 5|
|Meera| Bangalore| airbnb11| 1| 6|
|Meera| Bangalore| airbnb8| 1| 7|
|Meera| Hanoi| airbnb1| 2| 8|
|Meera| Hanoi| airbnb2| 1| 9|
|Meera| Hanoi| airbnb7| 1| 10|
|Meera| Mumbai| | 1| 11|
|Meera| Oslo| | 2| 12|
|Meera| Oslo| airbnb8| 1| 13|
|Meera| Paris| | 1| 14|
|Meera| Paris| airbnb11| 1| 15|
|Meera| Paris| airbnb6| 1| 16|
|Meera| Paris| airbnb7| 1| 17|
|Meera| Paris| airbnb8| 2| 18|
|Meera| Rotterdam| airbnb2| 1| 19|
我假设 dense_rank 会将那些具有空字段值(在本例中为第 3 个空字段)的记录推到最后。这是正确的吗?
如果我没猜错,你可能想要 windowed 排名。您可以尝试以下方法:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("User").orderBy("User", "clickedDestination", "clickedAirbnb")
val result = df.withColumn("clickedAirbnbIndex", dense_rank().over(window))
如果需要,您可以在 Spark here.
中找到一些关于 window 函数的好读物
此外,functions package api documentation 非常有用。
我有一个如下所示的数据框,其中最后一列表示用户搜索位置和停留的次数
| Hanks| Rotterdam| airbnb7| 1|
|Sanders| Rotterdam| airbnb2| 1|
| Hanks| Rotterdam| airbnb2| 3|
| Hanks| Tokyo| airbnb8| 2|
| Larry| Hanoi| | 2|
| Mango| Seoul| airbnb5| 1|
| Larry| Hanoi| airbnb1| 2|
我想转换如下
| Hanks| Rotterdam| airbnb7| 1| 1|
|Sanders| Rotterdam| airbnb2| 1| 1|
| Hanks| Rotterdam| airbnb2| 3| 2|
| Hanks| Tokyo| airbnb8| 2| 3|
| Larry| Hanoi| | 2| 0|
| Mango| Seoul| airbnb5| 1| 1|
| Larry| Hanoi| airbnb1| 2| 1|
请注意,第 5 列表示用户选择的唯一选项组合(位置+住宿)的索引。 例如
| Hanks| Rotterdam| airbnb7| 1| 1|
| Hanks| Rotterdam| airbnb2| 3| 2|
| Hanks| Tokyo| airbnb8| 2| 3|
我尝试使用 groupBy/Agg 通过在 agg 函数中实现如下的 udf 函数来做到这一点。
val df2 = df1.groupBy("User", "clickedDestination", "clickedAirbnb")
.agg(indexUserDetailsUDF(col("clickedAirbnb")) as ("clickedAirbnbIndex"))
而udf如下
var cnt = 0
val airbnbClickIndex:(String) => String = (airbnb) => {
if(airbnb== "") "null" //return 0 for airbnbClickIndex when airbnb is empty
else{cnt+=1; cnt.toString()} //otherwise return incremented value
}
val indexUserDetailsUDF = udf(airbnbClickIndex)
但这不起作用。非常感谢任何输入。 谢谢。
更新 1:Daniel dense_rank 的建议对用户
执行以下操作|Meera| Amsterdam| airbnb12| 1| 1|
|Meera| Amsterdam| airbnb2| 1| 2|
|Meera| Amsterdam| airbnb7| 1| 3|
|Meera| Amsterdam| airbnb8| 1| 4|
|Meera| Bangalore| | 1| 5|
|Meera| Bangalore| airbnb11| 1| 6|
|Meera| Bangalore| airbnb8| 1| 7|
|Meera| Hanoi| airbnb1| 2| 8|
|Meera| Hanoi| airbnb2| 1| 9|
|Meera| Hanoi| airbnb7| 1| 10|
|Meera| Mumbai| | 1| 11|
|Meera| Oslo| | 2| 12|
|Meera| Oslo| airbnb8| 1| 13|
|Meera| Paris| | 1| 14|
|Meera| Paris| airbnb11| 1| 15|
|Meera| Paris| airbnb6| 1| 16|
|Meera| Paris| airbnb7| 1| 17|
|Meera| Paris| airbnb8| 2| 18|
|Meera| Rotterdam| airbnb2| 1| 19|
我假设 dense_rank 会将那些具有空字段值(在本例中为第 3 个空字段)的记录推到最后。这是正确的吗?
如果我没猜错,你可能想要 windowed 排名。您可以尝试以下方法:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("User").orderBy("User", "clickedDestination", "clickedAirbnb")
val result = df.withColumn("clickedAirbnbIndex", dense_rank().over(window))
如果需要,您可以在 Spark here.
中找到一些关于 window 函数的好读物此外,functions package api documentation 非常有用。