spark 聚合每个键的事件集,包括它们的更改时间戳
spark aggregate set of events per key including their change timestamps
对于数据帧:
+----+--------+-------------------+----+
|user| dt| time_value|item|
+----+--------+-------------------+----+
| id1|20200101|2020-01-01 00:00:00| A|
| id1|20200101|2020-01-01 10:00:00| B|
| id1|20200101|2020-01-01 09:00:00| A|
| id1|20200101|2020-01-01 11:00:00| B|
+----+--------+-------------------+----+
我想捕获所有独特的项目,即 collect_set
,但保留它自己的 time_value
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.collect_set
import org.apache.spark.sql.types.TimestampType
val timeFormat = "yyyy-MM-dd HH:mm"
val dx = Seq(("id1", "20200101", "2020-01-01 00:00", "A"), ("id1", "20200101","2020-01-01 10:00", "B"), ("id1", "20200101","2020-01-01 9:00", "A"), ("id1", "20200101","2020-01-01 11:00", "B")).toDF("user", "dt","time_value", "item").withColumn("time_value", unix_timestamp(col("time_value"), timeFormat).cast(TimestampType))
dx.show
一个
dx.groupBy("user", "dt").agg(collect_set("item")).show
+----+--------+-----------------+
|user| dt|collect_set(item)|
+----+--------+-----------------+
| id1|20200101| [B, A]|
+----+--------+-----------------+
不保留信号从A切换到B时的time_value
信息,如何保留item中每组的时间值信息?
是否可以在 window 函数中使用 collect_set 来达到预期的效果?目前,我只能想到:
- 使用 window 函数来确定事件对
- 过滤以更改事件
- 汇总
需要多次随机播放。或者,UDF 是可能的 (collect_list(sort_array(struct(time_value, item)))
),但这似乎也很笨拙。
有没有更好的方法?
我确实会使用 window 函数来隔离变化点,我认为没有其他选择:
val win = Window.partitionBy($"user",$"dt").orderBy($"time_value")
dx
.orderBy($"time_value")
.withColumn("item_change_post",coalesce((lag($"item",1).over(win)=!=$"item"),lit(false)))
.withColumn("item_change_pre",lead($"item_change_post",1).over(win))
.where($"item_change_pre" or $"item_change_post")
.show()
+----+--------+-------------------+----+----------------+---------------+
|user| dt| time_value|item|item_change_post|item_change_pre|
+----+--------+-------------------+----+----------------+---------------+
| id1|20200101|2020-01-01 09:00:00| A| false| true|
| id1|20200101|2020-01-01 10:00:00| B| true| false|
+----+--------+-------------------+----+----------------+---------------+
然后使用 groupBy($"user",$"dt").agg(collect_list(struct($"time_value",$"item")))
我认为不会发生多次随机播放,因为您总是 partition/group 使用相同的键。
您可以尝试通过将每个 item
的初始数据帧聚合到 min/max time_value
来提高效率,然后执行与上述相同的操作。
对于数据帧:
+----+--------+-------------------+----+
|user| dt| time_value|item|
+----+--------+-------------------+----+
| id1|20200101|2020-01-01 00:00:00| A|
| id1|20200101|2020-01-01 10:00:00| B|
| id1|20200101|2020-01-01 09:00:00| A|
| id1|20200101|2020-01-01 11:00:00| B|
+----+--------+-------------------+----+
我想捕获所有独特的项目,即 collect_set
,但保留它自己的 time_value
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.collect_set
import org.apache.spark.sql.types.TimestampType
val timeFormat = "yyyy-MM-dd HH:mm"
val dx = Seq(("id1", "20200101", "2020-01-01 00:00", "A"), ("id1", "20200101","2020-01-01 10:00", "B"), ("id1", "20200101","2020-01-01 9:00", "A"), ("id1", "20200101","2020-01-01 11:00", "B")).toDF("user", "dt","time_value", "item").withColumn("time_value", unix_timestamp(col("time_value"), timeFormat).cast(TimestampType))
dx.show
一个
dx.groupBy("user", "dt").agg(collect_set("item")).show
+----+--------+-----------------+
|user| dt|collect_set(item)|
+----+--------+-----------------+
| id1|20200101| [B, A]|
+----+--------+-----------------+
不保留信号从A切换到B时的time_value
信息,如何保留item中每组的时间值信息?
是否可以在 window 函数中使用 collect_set 来达到预期的效果?目前,我只能想到:
- 使用 window 函数来确定事件对
- 过滤以更改事件
- 汇总
需要多次随机播放。或者,UDF 是可能的 (collect_list(sort_array(struct(time_value, item)))
),但这似乎也很笨拙。
有没有更好的方法?
我确实会使用 window 函数来隔离变化点,我认为没有其他选择:
val win = Window.partitionBy($"user",$"dt").orderBy($"time_value")
dx
.orderBy($"time_value")
.withColumn("item_change_post",coalesce((lag($"item",1).over(win)=!=$"item"),lit(false)))
.withColumn("item_change_pre",lead($"item_change_post",1).over(win))
.where($"item_change_pre" or $"item_change_post")
.show()
+----+--------+-------------------+----+----------------+---------------+
|user| dt| time_value|item|item_change_post|item_change_pre|
+----+--------+-------------------+----+----------------+---------------+
| id1|20200101|2020-01-01 09:00:00| A| false| true|
| id1|20200101|2020-01-01 10:00:00| B| true| false|
+----+--------+-------------------+----+----------------+---------------+
然后使用 groupBy($"user",$"dt").agg(collect_list(struct($"time_value",$"item")))
我认为不会发生多次随机播放,因为您总是 partition/group 使用相同的键。
您可以尝试通过将每个 item
的初始数据帧聚合到 min/max time_value
来提高效率,然后执行与上述相同的操作。