每次值发生变化时,PySpark 都会获取最小和最大日期
PySpark get min and max dates each time there is a value change
我有如下数据,我想按 id 分组,每当每个 id
中的 value
发生变化时,我想获取时间戳的第一个和最后一个值
time id value
1/20/2022 9:46:48.756 London 9
1/20/2022 9:46:48.756 London 9
1/20/2022 9:46:49.146 London 9
1/20/2022 9:46:55.855 Paris 1
1/20/2022 9:46:55.955 Paris 4
1/20/2022 9:46:56.145 Paris 4
1/20/2022 9:46:57.179 London 4
1/20/2022 9:46:58.179 London 4
1/20/2022 9:46:57.455 Chicago 2
1/20/2022 9:46:59.145 Chicago 2
1/20/2022 9:47:04.145 Chicago 2
1/20/2022 9:47:06.145 Detroit 9
1/20/2022 9:47:07.654 Detroit 9
1/20/2022 9:47:08.554 Detroit 9
1/20/2022 9:47:11.144 Atlanta 9
1/20/2022 9:47:11.159 Atlanta 9
1/20/2022 9:47:17.144 California 4
1/20/2022 9:47:25.143 California 4
1/20/2022 9:47:46.143 California 4
1/20/2022 9:47:48.143 California 4
我的结果 spark 数据框应该如下所示
id value start_time end_time
London 9 1/20/2022 9:46:48.756 1/20/2022 9:46:49.146
Paris 1 1/20/2022 9:46:55.855 1/20/2022 9:46:55.855
Paris 4 1/20/2022 9:46:55.955 1/20/2022 9:46:56.145
London 4 1/20/2022 9:46:57.179 1/20/2022 9:46:58.179
Chicago 2 1/20/2022 9:46:57.455 1/20/2022 9:47:04.145
Detroit 9 1/20/2022 9:47:06.145 1/20/2022 9:47:08.554
Atlanta 9 1/20/2022 9:47:11.144 1/20/2022 9:47:11.159
California 4 1/20/2022 9:47:17.144 1/20/2022 9:47:48.143
我试过下面的代码,只有当下一行的值发生变化时,它才会给我最大值和最小值
w = Window.partitionBy('id').orderBy('timestamp')
df = (
data.withColumn('id', (F.col('id') != F.lag('id').over(w)).cast('int'))
.withColumn('value_changed', (F.col('value') != F.lag('value', 1, 0).over(w)).cast('int'))
.withColumn('id_group_id', F.sum('id_changed').over(w))
.withColumn('value_group_id', F.sum('value_changed').over(w))
.groupBy('id', 'id_group_id', 'value', 'value_group_id')
.agg(
F.min('time').alias('start_time'),
F.max('time').alias('end_time')
)
.drop('id_group_id','value_group_id')
)
df.show()
感谢您的帮助
看来你只是想按 id
+ value
分组并计算 min/max time
如果我正确理解你的问题:
from pyspark.sql import functions as F
result = df.groupBy("id", "value").agg(
F.min("time").alias("start_time"),
F.max("time").alias("end_time")
)
result.show(truncate=False)
#+----------+-----+---------------------+---------------------+
#|id |value|start_time |end_time |
#+----------+-----+---------------------+---------------------+
#|Atlanta |9 |1/20/2022 9:47:11.144|1/20/2022 9:47:11.159|
#|California|4 |1/20/2022 9:47:17.144|1/20/2022 9:47:48.143|
#|Chicago |2 |1/20/2022 9:46:57.455|1/20/2022 9:47:04.145|
#|Detroit |9 |1/20/2022 9:47:06.145|1/20/2022 9:47:08.554|
#|London |4 |1/20/2022 9:46:57.179|1/20/2022 9:46:58.179|
#|London |9 |1/20/2022 9:46:48.756|1/20/2022 9:46:49.146|
#|Paris |1 |1/20/2022 9:46:55.855|1/20/2022 9:46:55.855|
#|Paris |4 |1/20/2022 9:46:55.955|1/20/2022 9:46:56.145|
#+----------+-----+---------------------+---------------------+
我有如下数据,我想按 id 分组,每当每个 id
中的 value
发生变化时,我想获取时间戳的第一个和最后一个值
time id value
1/20/2022 9:46:48.756 London 9
1/20/2022 9:46:48.756 London 9
1/20/2022 9:46:49.146 London 9
1/20/2022 9:46:55.855 Paris 1
1/20/2022 9:46:55.955 Paris 4
1/20/2022 9:46:56.145 Paris 4
1/20/2022 9:46:57.179 London 4
1/20/2022 9:46:58.179 London 4
1/20/2022 9:46:57.455 Chicago 2
1/20/2022 9:46:59.145 Chicago 2
1/20/2022 9:47:04.145 Chicago 2
1/20/2022 9:47:06.145 Detroit 9
1/20/2022 9:47:07.654 Detroit 9
1/20/2022 9:47:08.554 Detroit 9
1/20/2022 9:47:11.144 Atlanta 9
1/20/2022 9:47:11.159 Atlanta 9
1/20/2022 9:47:17.144 California 4
1/20/2022 9:47:25.143 California 4
1/20/2022 9:47:46.143 California 4
1/20/2022 9:47:48.143 California 4
我的结果 spark 数据框应该如下所示
id value start_time end_time
London 9 1/20/2022 9:46:48.756 1/20/2022 9:46:49.146
Paris 1 1/20/2022 9:46:55.855 1/20/2022 9:46:55.855
Paris 4 1/20/2022 9:46:55.955 1/20/2022 9:46:56.145
London 4 1/20/2022 9:46:57.179 1/20/2022 9:46:58.179
Chicago 2 1/20/2022 9:46:57.455 1/20/2022 9:47:04.145
Detroit 9 1/20/2022 9:47:06.145 1/20/2022 9:47:08.554
Atlanta 9 1/20/2022 9:47:11.144 1/20/2022 9:47:11.159
California 4 1/20/2022 9:47:17.144 1/20/2022 9:47:48.143
我试过下面的代码,只有当下一行的值发生变化时,它才会给我最大值和最小值
w = Window.partitionBy('id').orderBy('timestamp')
df = (
data.withColumn('id', (F.col('id') != F.lag('id').over(w)).cast('int'))
.withColumn('value_changed', (F.col('value') != F.lag('value', 1, 0).over(w)).cast('int'))
.withColumn('id_group_id', F.sum('id_changed').over(w))
.withColumn('value_group_id', F.sum('value_changed').over(w))
.groupBy('id', 'id_group_id', 'value', 'value_group_id')
.agg(
F.min('time').alias('start_time'),
F.max('time').alias('end_time')
)
.drop('id_group_id','value_group_id')
)
df.show()
感谢您的帮助
看来你只是想按 id
+ value
分组并计算 min/max time
如果我正确理解你的问题:
from pyspark.sql import functions as F
result = df.groupBy("id", "value").agg(
F.min("time").alias("start_time"),
F.max("time").alias("end_time")
)
result.show(truncate=False)
#+----------+-----+---------------------+---------------------+
#|id |value|start_time |end_time |
#+----------+-----+---------------------+---------------------+
#|Atlanta |9 |1/20/2022 9:47:11.144|1/20/2022 9:47:11.159|
#|California|4 |1/20/2022 9:47:17.144|1/20/2022 9:47:48.143|
#|Chicago |2 |1/20/2022 9:46:57.455|1/20/2022 9:47:04.145|
#|Detroit |9 |1/20/2022 9:47:06.145|1/20/2022 9:47:08.554|
#|London |4 |1/20/2022 9:46:57.179|1/20/2022 9:46:58.179|
#|London |9 |1/20/2022 9:46:48.756|1/20/2022 9:46:49.146|
#|Paris |1 |1/20/2022 9:46:55.855|1/20/2022 9:46:55.855|
#|Paris |4 |1/20/2022 9:46:55.955|1/20/2022 9:46:56.145|
#+----------+-----+---------------------+---------------------+