UDAF 合并 Spark 中第一个 orderdby 的行 DataSet/Dataframe

UDAF merge rows where are first orderdby in a Spark DataSet/Dataframe

假设我们在 Spark 中有一个 dataset/dataframe,其中有 3 列 IDWordTimestamp

我想编写一个 UDAF 函数,我可以在其中执行类似的操作

df.show()

ID | Word | Timestamp
1  | I    | "2017-1-1 00:01"
1  | am   | "2017-1-1 00:02"
1  | Chris | "2017-1-1 00:03"
2  | I    | "2017-1-1 00:01"
2  | am   | "2017-1-1 00:02"
2  | Jessica | "2017-1-1 00:03"

val df_merged = df.groupBy("ID")
  .sort("ID", "Timestamp")
  .agg(custom_agg("ID", "Word", "Timestamp")

df_merged.show

ID | Words         | StartTime        |      EndTime     |
1  | "I am Chris"  | "2017-1-1 00:01" | "2017-1-1 00:03" |
1  | "I am Jessica"  | "2017-1-1 00:01" | "2017-1-1 00:03" |

问题是如何确保 Words 列在我的 UDAF 中以正确的顺序合并?

抱歉,我不会使用 Scala,希望您能阅读。

Window 函数可以做你想做的事:

df = df.withColumn('Words', f.collect_list(df['Word']).over(
    Window().partitionBy(df['ID']).orderBy('Timestamp').rowsBetween(start=Window.unboundedPreceding,
                                                                    end=Window.unboundedFollowing)))

输出:

+---+-------+-----------------+----------------+                                
| ID|   Word|        Timestamp|           Words|
+---+-------+-----------------+----------------+
|  1|      I|2017-1-1 00:01:00|  [I, am, Chris]|
|  1|     am|2017-1-1 00:02:00|  [I, am, Chris]|
|  1|  Chris|2017-1-1 00:03:00|  [I, am, Chris]|
|  2|      I|2017-1-1 00:01:00|[I, am, Jessica]|
|  2|     am|2017-1-1 00:02:00|[I, am, Jessica]|
|  2|Jessica|2017-1-1 00:03:00|[I, am, Jessica]|
+---+-------+-----------------+----------------+

然后groupBy以上数据:

df = df.groupBy(df['ID'], df['Words']).agg(
    f.min(df['Timestamp']).alias('StartTime'), f.max(df['Timestamp']).alias('EndTime'))
df = df.withColumn('Words', f.concat_ws(' ', df['Words']))

输出:

+---+------------+-----------------+-----------------+                          
| ID|       Words|        StartTime|          EndTime|
+---+------------+-----------------+-----------------+
|  1|  I am Chris|2017-1-1 00:01:00|2017-1-1 00:03:00|
|  2|I am Jessica|2017-1-1 00:01:00|2017-1-1 00:03:00|
+---+------------+-----------------+-----------------+

这是 Spark 2 的 groupByKey 的解决方案(与未类型化的 Dataset 一起使用)。groupByKey 的优点是您可以访问该组(您获得 Iterator[Row]mapGroups):

 df.groupByKey(r => r.getAs[Int]("ID"))
      .mapGroups{case(id,rows) => {
        val sorted = rows
          .toVector
          .map(r => (r.getAs[String]("Word"),r.getAs[java.sql.Timestamp]("Timestamp")))
          .sortBy(_._2.getTime)

        (id, 
         sorted.map(_._1).mkString(" "),
         sorted.map(_._2).head,
         sorted.map(_._2).last
         )  
        }
      }.toDF("ID","Words","StartTime","EndTime")