基于另一列(在本例中为用户)将列索引添加到数据框

Add column index to dataframe based on another column (user in this case)

我有一个如下所示的数据框,其中最后一列表示用户搜索位置和停留的次数

|  Hanks|         Rotterdam|      airbnb7|                     1|
|Sanders|         Rotterdam|      airbnb2|                     1|
|  Hanks|         Rotterdam|      airbnb2|                     3|
|  Hanks|             Tokyo|      airbnb8|                     2|
|  Larry|             Hanoi|             |                     2|
|  Mango|             Seoul|      airbnb5|                     1|
|  Larry|             Hanoi|      airbnb1|                     2|

我想转换如下

|  Hanks|         Rotterdam|      airbnb7|                     1|    1|
|Sanders|         Rotterdam|      airbnb2|                     1|    1|
|  Hanks|         Rotterdam|      airbnb2|                     3|    2|
|  Hanks|             Tokyo|      airbnb8|                     2|    3|
|  Larry|             Hanoi|             |                     2|    0|
|  Mango|             Seoul|      airbnb5|                     1|    1|
|  Larry|             Hanoi|      airbnb1|                     2|    1|

请注意,第 5 列表示用户选择的唯一选项组合(位置+住宿)的索引。 例如

|  Hanks|         Rotterdam|      airbnb7|                     1|    1|
|  Hanks|         Rotterdam|      airbnb2|                     3|    2|
|  Hanks|             Tokyo|      airbnb8|                     2|    3|

我尝试使用 groupBy/Agg 通过在 agg 函数中实现如下的 udf 函数来做到这一点。

val df2 = df1.groupBy("User", "clickedDestination", "clickedAirbnb")
                      .agg(indexUserDetailsUDF(col("clickedAirbnb")) as ("clickedAirbnbIndex"))

而udf如下

var cnt = 0
val airbnbClickIndex:(String) => String = (airbnb) => {
  if(airbnb== "") "null" //return 0 for airbnbClickIndex when airbnb is empty
  else{cnt+=1; cnt.toString()} //otherwise return incremented value
}
val indexUserDetailsUDF = udf(airbnbClickIndex)

但这不起作用。非常感谢任何输入。 谢谢。

更新 1:Daniel dense_rank 的建议对用户

执行以下操作
|Meera|         Amsterdam|     airbnb12|         1|     1|
|Meera|         Amsterdam|      airbnb2|         1|     2|
|Meera|         Amsterdam|      airbnb7|         1|     3|
|Meera|         Amsterdam|      airbnb8|         1|     4|
|Meera|         Bangalore|             |         1|     5|
|Meera|         Bangalore|     airbnb11|         1|     6|
|Meera|         Bangalore|      airbnb8|         1|     7|
|Meera|             Hanoi|      airbnb1|         2|     8|
|Meera|             Hanoi|      airbnb2|         1|     9|
|Meera|             Hanoi|      airbnb7|         1|    10|
|Meera|            Mumbai|             |         1|    11|
|Meera|              Oslo|             |         2|    12|
|Meera|              Oslo|      airbnb8|         1|    13|
|Meera|             Paris|             |         1|    14|
|Meera|             Paris|     airbnb11|         1|    15|
|Meera|             Paris|      airbnb6|         1|    16|
|Meera|             Paris|      airbnb7|         1|    17|
|Meera|             Paris|      airbnb8|         2|    18|
|Meera|         Rotterdam|      airbnb2|         1|    19|

我假设 dense_rank 会将那些具有空字段值(在本例中为第 3 个空字段)的记录推到最后。这是正确的吗?

如果我没猜错,你可能想要 windowed 排名。您可以尝试以下方法:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val window = Window.partitionBy("User").orderBy("User", "clickedDestination", "clickedAirbnb")

val result = df.withColumn("clickedAirbnbIndex", dense_rank().over(window))

如果需要,您可以在 Spark here.

中找到一些关于 window 函数的好读物

此外,functions package api documentation 非常有用。