spark 聚合每个键的事件集,包括它们的更改时间戳

spark aggregate set of events per key including their change timestamps

对于数据帧:

+----+--------+-------------------+----+
|user|      dt|         time_value|item|
+----+--------+-------------------+----+
| id1|20200101|2020-01-01 00:00:00|   A|
| id1|20200101|2020-01-01 10:00:00|   B|
| id1|20200101|2020-01-01 09:00:00|   A|
| id1|20200101|2020-01-01 11:00:00|   B|
+----+--------+-------------------+----+

我想捕获所有独特的项目,即 collect_set,但保留它自己的 time_value

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.collect_set
import org.apache.spark.sql.types.TimestampType
val timeFormat = "yyyy-MM-dd HH:mm"
val dx = Seq(("id1", "20200101", "2020-01-01 00:00", "A"), ("id1", "20200101","2020-01-01 10:00", "B"), ("id1", "20200101","2020-01-01 9:00", "A"), ("id1", "20200101","2020-01-01 11:00", "B")).toDF("user", "dt","time_value", "item").withColumn("time_value", unix_timestamp(col("time_value"), timeFormat).cast(TimestampType))
dx.show

一个

dx.groupBy("user", "dt").agg(collect_set("item")).show
+----+--------+-----------------+                                               
|user|      dt|collect_set(item)|
+----+--------+-----------------+
| id1|20200101|           [B, A]|
+----+--------+-----------------+

不保留信号从A切换到B时的time_value信息,如何保留item中每组的时间值信息?

是否可以在 window 函数中使用 collect_set 来达到预期的效果?目前,我只能想到:

  1. 使用 window 函数来确定事件对
  2. 过滤以更改事件
  3. 汇总

需要多次随机播放。或者,UDF 是可能的 (collect_list(sort_array(struct(time_value, item)))),但这似乎也很笨拙。

有没有更好的方法?

我确实会使用 window 函数来隔离变化点,我认为没有其他选择:

val win = Window.partitionBy($"user",$"dt").orderBy($"time_value")

dx
.orderBy($"time_value")
.withColumn("item_change_post",coalesce((lag($"item",1).over(win)=!=$"item"),lit(false)))
.withColumn("item_change_pre",lead($"item_change_post",1).over(win))
.where($"item_change_pre" or $"item_change_post")
.show()

+----+--------+-------------------+----+----------------+---------------+
|user|      dt|         time_value|item|item_change_post|item_change_pre|
+----+--------+-------------------+----+----------------+---------------+
| id1|20200101|2020-01-01 09:00:00|   A|           false|           true|
| id1|20200101|2020-01-01 10:00:00|   B|            true|          false|
+----+--------+-------------------+----+----------------+---------------+

然后使用 groupBy($"user",$"dt").agg(collect_list(struct($"time_value",$"item")))

我认为不会发生多次随机播放,因为您总是 partition/group 使用相同的键。

您可以尝试通过将每个 item 的初始数据帧聚合到 min/max time_value 来提高效率,然后执行与上述相同的操作。