Spark Dataframe 在一对行上滑动 window
Spark Dataframe sliding window over pair of rows
我有一个 csv 格式的事件日志,包含三列 timestamp
、eventId
和 userId
。
我想做的是将新列 nextEventId
添加到数据框。
事件日志示例:
eventlog = sqlContext.createDataFrame(Array((20160101, 1, 0),(20160102,3,1),(20160201,4,1),(20160202, 2,0))).toDF("timestamp", "eventId", "userId")
eventlog.show(4)
|timestamp|eventId|userId|
+---------+-------+------+
| 20160101| 1| 0|
| 20160102| 3| 1|
| 20160201| 4| 1|
| 20160202| 2| 0|
+---------+-------+------+
期望的最终结果是:
|timestamp|eventId|userId|nextEventId|
+---------+-------+------+-----------+
| 20160101| 1| 0| 2|
| 20160102| 3| 1| 4|
| 20160201| 4| 1| Nil|
| 20160202| 2| 0| Nil|
+---------+-------+------+-----------+
到目前为止,我一直在搞乱滑动 windows,但不知道如何比较 2 行...
val w = Window.partitionBy("userId").orderBy(asc("timestamp")) //should be a sliding window over 2 rows...
val nextNodes = second($"eventId").over(w) //should work if there are only 2 rows
您要查找的是 lead
(或 lag
)。使用 window 您已经定义:
import org.apache.spark.sql.functions.lead
eventlog.withColumn("nextEventId", lead("eventId", 1).over(w))
对于真正的滑动 window(如滑动平均值),您可以使用 window 定义的 rowsBetween
或 rangeBetween
子句,但这里并不是真正需要的。尽管如此,示例用法可能是这样的:
val w2 = Window.partitionBy("userId")
.orderBy(asc("timestamp"))
.rowsBetween(-1, 0)
avg($"foo").over(w2)
我有一个 csv 格式的事件日志,包含三列 timestamp
、eventId
和 userId
。
我想做的是将新列 nextEventId
添加到数据框。
事件日志示例:
eventlog = sqlContext.createDataFrame(Array((20160101, 1, 0),(20160102,3,1),(20160201,4,1),(20160202, 2,0))).toDF("timestamp", "eventId", "userId")
eventlog.show(4)
|timestamp|eventId|userId|
+---------+-------+------+
| 20160101| 1| 0|
| 20160102| 3| 1|
| 20160201| 4| 1|
| 20160202| 2| 0|
+---------+-------+------+
期望的最终结果是:
|timestamp|eventId|userId|nextEventId|
+---------+-------+------+-----------+
| 20160101| 1| 0| 2|
| 20160102| 3| 1| 4|
| 20160201| 4| 1| Nil|
| 20160202| 2| 0| Nil|
+---------+-------+------+-----------+
到目前为止,我一直在搞乱滑动 windows,但不知道如何比较 2 行...
val w = Window.partitionBy("userId").orderBy(asc("timestamp")) //should be a sliding window over 2 rows...
val nextNodes = second($"eventId").over(w) //should work if there are only 2 rows
您要查找的是 lead
(或 lag
)。使用 window 您已经定义:
import org.apache.spark.sql.functions.lead
eventlog.withColumn("nextEventId", lead("eventId", 1).over(w))
对于真正的滑动 window(如滑动平均值),您可以使用 window 定义的 rowsBetween
或 rangeBetween
子句,但这里并不是真正需要的。尽管如此,示例用法可能是这样的:
val w2 = Window.partitionBy("userId")
.orderBy(asc("timestamp"))
.rowsBetween(-1, 0)
avg($"foo").over(w2)