每次值发生变化时,PySpark 都会获取最小和最大日期

PySpark get min and max dates each time there is a value change

我有如下数据,我想按 id 分组,每当每个 id 中的 value 发生变化时,我想获取时间戳的第一个和最后一个值

time                     id         value
1/20/2022 9:46:48.756   London        9
1/20/2022 9:46:48.756   London        9
1/20/2022 9:46:49.146   London        9
1/20/2022 9:46:55.855   Paris         1
1/20/2022 9:46:55.955   Paris         4
1/20/2022 9:46:56.145   Paris         4
1/20/2022 9:46:57.179   London        4
1/20/2022 9:46:58.179   London        4
1/20/2022 9:46:57.455   Chicago       2
1/20/2022 9:46:59.145   Chicago       2
1/20/2022 9:47:04.145   Chicago       2
1/20/2022 9:47:06.145   Detroit       9
1/20/2022 9:47:07.654   Detroit       9
1/20/2022 9:47:08.554   Detroit       9
1/20/2022 9:47:11.144   Atlanta       9
1/20/2022 9:47:11.159   Atlanta       9
1/20/2022 9:47:17.144   California    4
1/20/2022 9:47:25.143   California    4
1/20/2022 9:47:46.143   California    4
1/20/2022 9:47:48.143   California    4

我的结果 spark 数据框应该如下所示

id        value     start_time                    end_time
London     9    1/20/2022 9:46:48.756   1/20/2022 9:46:49.146
Paris      1    1/20/2022 9:46:55.855   1/20/2022 9:46:55.855
Paris      4    1/20/2022 9:46:55.955   1/20/2022 9:46:56.145
London     4    1/20/2022 9:46:57.179   1/20/2022 9:46:58.179
Chicago    2    1/20/2022 9:46:57.455   1/20/2022 9:47:04.145
Detroit    9    1/20/2022 9:47:06.145   1/20/2022 9:47:08.554
Atlanta    9    1/20/2022 9:47:11.144   1/20/2022 9:47:11.159
California 4    1/20/2022 9:47:17.144   1/20/2022 9:47:48.143

我试过下面的代码,只有当下一行的值发生变化时,它才会给我最大值和最小值

w = Window.partitionBy('id').orderBy('timestamp')

df = (
  data.withColumn('id', (F.col('id') != F.lag('id').over(w)).cast('int'))
  .withColumn('value_changed', (F.col('value') != F.lag('value', 1, 0).over(w)).cast('int'))
  .withColumn('id_group_id', F.sum('id_changed').over(w))
  .withColumn('value_group_id', F.sum('value_changed').over(w))
  .groupBy('id', 'id_group_id', 'value', 'value_group_id')
  .agg(
    F.min('time').alias('start_time'),
    F.max('time').alias('end_time')
  )
  .drop('id_group_id','value_group_id')
)

df.show()

感谢您的帮助

看来你只是想按 id + value 分组并计算 min/max time 如果我正确理解你的问题:

from pyspark.sql import functions as F

result = df.groupBy("id", "value").agg(
    F.min("time").alias("start_time"),
    F.max("time").alias("end_time")
)

result.show(truncate=False)
#+----------+-----+---------------------+---------------------+
#|id        |value|start_time           |end_time             |
#+----------+-----+---------------------+---------------------+
#|Atlanta   |9    |1/20/2022 9:47:11.144|1/20/2022 9:47:11.159|
#|California|4    |1/20/2022 9:47:17.144|1/20/2022 9:47:48.143|
#|Chicago   |2    |1/20/2022 9:46:57.455|1/20/2022 9:47:04.145|
#|Detroit   |9    |1/20/2022 9:47:06.145|1/20/2022 9:47:08.554|
#|London    |4    |1/20/2022 9:46:57.179|1/20/2022 9:46:58.179|
#|London    |9    |1/20/2022 9:46:48.756|1/20/2022 9:46:49.146|
#|Paris     |1    |1/20/2022 9:46:55.855|1/20/2022 9:46:55.855|
#|Paris     |4    |1/20/2022 9:46:55.955|1/20/2022 9:46:56.145|
#+----------+-----+---------------------+---------------------+