如何使用 PySpark 获得特定的持续时间?
How to use PySpark to get certain durations?
假设我们已经在 PySpark 中定义了下面的 DataFrame。以及,如何使用 PySpark 获取第一次骑车事件和驾驶事件(直到下一次驾驶事件)之间的持续时间(以分钟为单位)? (将结果存入dataframe,包括start_time、start_event、end_time、end_event、durations等)
注意:骑车和开车之间可能还有其他事件,比如一个人可以从骑车开始,运行然后开车。
例如,第一个骑车事件和即将到来的驾驶事件之间的持续时间是:
开始时间:事件应为 'biking',操作应为 'start'
结束时间:事件应为 'driving',操作应为 'end'
因此日期“3/01/2018”的持续时间为:8:12 - 5:12 = 3 小时 = 180 分钟
('start_time' -> 时间戳发生在:3/1/2018 5:12 & 'end_time' -> 时间戳发生在:3/1/2018 8:12)
注意:可以有多个骑车事件然后驾驶事件到来,只需要记录第一个骑车事件直到下一个驾驶事件。
日期“3/02/2018”的持续时间为:14:12(结束时间为驾驶结束)- 9:12(开始时间为第一次骑行)= 14:12 - 9:12 = 5 小时 = 300 分钟
TimeDetails
Event
Action
1
3/1/2018 5:12
Biking
start
2
3/1/2018 6:12
Biking
end
3
3/1/2018 7:12
Driving
start
4
3/1/2018 8:12
Driving
end
5
3/2/2018 9:12
Biking
start
6
3/2/2018 10:12
Biking
end
7
3/2/2018 11:12
Biking
start
8
3/2/2018 12:12
Biking
end
9
3/2/2018 13:12
Driving
start
9
3/2/2018 14:12
Driving
end
下面是我的一些代码:
biking_df = df.filter(df.Event == 'Biking' & (df.Action == 'start'))
driving_df = df.filter(df.Event == 'Driving' & (df.Action == 'end'))
有人可以向我提供 PySpark 中的一些代码吗?
非常感谢
您首先必须确定并 select 包含持续时间开始和结束时间的相关行。使用 Window
函数和 lag
您可以将日期放入同一行并计算持续时间:
from pyspark.sql import Window, functions as F
time_order_window = Window.orderBy('TimeDetails')
(
df
.withColumn('previous_event', F.lag('Event').over(time_order_window)
.where(
(F.col('Event') == 'Driving' & F.col('Action') == 'end') |
(F.col('Event') == 'Biking' & F.col('previous_event') == 'Driving')
)
.withColumn('previous_time', F.lag('TimeDetails').over(time_order_window))
.withColumn('duration_in_sec', (F.col('TimeDetails').cast('double') - F.col('previous_time').cast('double')))
.withColumn('duration_in_min', F.col('duration_in_sec') / 60
.where(F.col('Event') == 'Driving')
)
这假设在一个或多个自行车事件之间始终只有一个驾驶事件(两行)。
假设我们已经在 PySpark 中定义了下面的 DataFrame。以及,如何使用 PySpark 获取第一次骑车事件和驾驶事件(直到下一次驾驶事件)之间的持续时间(以分钟为单位)? (将结果存入dataframe,包括start_time、start_event、end_time、end_event、durations等)
注意:骑车和开车之间可能还有其他事件,比如一个人可以从骑车开始,运行然后开车。
例如,第一个骑车事件和即将到来的驾驶事件之间的持续时间是:
开始时间:事件应为 'biking',操作应为 'start'
结束时间:事件应为 'driving',操作应为 'end'
因此日期“3/01/2018”的持续时间为:8:12 - 5:12 = 3 小时 = 180 分钟
('start_time' -> 时间戳发生在:3/1/2018 5:12 & 'end_time' -> 时间戳发生在:3/1/2018 8:12)
注意:可以有多个骑车事件然后驾驶事件到来,只需要记录第一个骑车事件直到下一个驾驶事件。
日期“3/02/2018”的持续时间为:14:12(结束时间为驾驶结束)- 9:12(开始时间为第一次骑行)= 14:12 - 9:12 = 5 小时 = 300 分钟
TimeDetails | Event | Action | |
---|---|---|---|
1 | 3/1/2018 5:12 | Biking | start |
2 | 3/1/2018 6:12 | Biking | end |
3 | 3/1/2018 7:12 | Driving | start |
4 | 3/1/2018 8:12 | Driving | end |
5 | 3/2/2018 9:12 | Biking | start |
6 | 3/2/2018 10:12 | Biking | end |
7 | 3/2/2018 11:12 | Biking | start |
8 | 3/2/2018 12:12 | Biking | end |
9 | 3/2/2018 13:12 | Driving | start |
9 | 3/2/2018 14:12 | Driving | end |
下面是我的一些代码:
biking_df = df.filter(df.Event == 'Biking' & (df.Action == 'start'))
driving_df = df.filter(df.Event == 'Driving' & (df.Action == 'end'))
有人可以向我提供 PySpark 中的一些代码吗? 非常感谢
您首先必须确定并 select 包含持续时间开始和结束时间的相关行。使用 Window
函数和 lag
您可以将日期放入同一行并计算持续时间:
from pyspark.sql import Window, functions as F
time_order_window = Window.orderBy('TimeDetails')
(
df
.withColumn('previous_event', F.lag('Event').over(time_order_window)
.where(
(F.col('Event') == 'Driving' & F.col('Action') == 'end') |
(F.col('Event') == 'Biking' & F.col('previous_event') == 'Driving')
)
.withColumn('previous_time', F.lag('TimeDetails').over(time_order_window))
.withColumn('duration_in_sec', (F.col('TimeDetails').cast('double') - F.col('previous_time').cast('double')))
.withColumn('duration_in_min', F.col('duration_in_sec') / 60
.where(F.col('Event') == 'Driving')
)
这假设在一个或多个自行车事件之间始终只有一个驾驶事件(两行)。