Spark Dataframe 在一对行上滑动 window

Spark Dataframe sliding window over pair of rows

我有一个 csv 格式的事件日志,包含三列 timestampeventIduserId

我想做的是将新列 nextEventId 添加到数据框。

事件日志示例:

eventlog = sqlContext.createDataFrame(Array((20160101, 1, 0),(20160102,3,1),(20160201,4,1),(20160202, 2,0))).toDF("timestamp", "eventId", "userId")
eventlog.show(4)

|timestamp|eventId|userId|
+---------+-------+------+
| 20160101|      1|     0|
| 20160102|      3|     1|
| 20160201|      4|     1|
| 20160202|      2|     0|
+---------+-------+------+

期望的最终结果是:

|timestamp|eventId|userId|nextEventId|
+---------+-------+------+-----------+
| 20160101|      1|     0|          2|
| 20160102|      3|     1|          4|
| 20160201|      4|     1|        Nil|
| 20160202|      2|     0|        Nil|
+---------+-------+------+-----------+

到目前为止,我一直在搞乱滑动 windows,但不知道如何比较 2 行...

val w = Window.partitionBy("userId").orderBy(asc("timestamp")) //should be a sliding window over 2 rows...
val nextNodes = second($"eventId").over(w) //should work if there are only 2 rows

您要查找的是 lead(或 lag)。使用 window 您已经定义:

import org.apache.spark.sql.functions.lead

eventlog.withColumn("nextEventId", lead("eventId", 1).over(w))

对于真正的滑动 window(如滑动平均值),您可以使用 window 定义的 rowsBetweenrangeBetween 子句,但这里并不是真正需要的。尽管如此,示例用法可能是这样的:

val w2 =  Window.partitionBy("userId")
  .orderBy(asc("timestamp"))
  .rowsBetween(-1, 0)

avg($"foo").over(w2)