如何使用 PySpark 获得特定的持续时间?

How to use PySpark to get certain durations?

假设我们已经在 PySpark 中定义了下面的 DataFrame。以及,如何使用 PySpark 获取第一次骑车事件和驾驶事件(直到下一次驾驶事件)之间的持续时间(以分钟为单位)? (将结果存入dataframe,包括start_time、start_event、end_time、end_event、durations等)

注意:骑车和开车之间可能还有其他事件,比如一个人可以从骑车开始,运行然后开车。

例如,第一个骑车事件和即将到来的驾驶事件之间的持续时间是:

开始时间:事件应为 'biking',操作应为 'start'

结束时间:事件应为 'driving',操作应为 'end'

因此日期“3/01/2018”的持续时间为:8:12 - 5:12 = 3 小时 = 180 分钟

('start_time' -> 时间戳发生在:3/1/2018 5:12 & 'end_time' -> 时间戳发生在:3/1/2018 8:12)

注意:可以有多个骑车事件然后驾驶事件到来,只需要记录第一个骑车事件直到下一个驾驶事件。

日期“3/02/2018”的持续时间为:14:12(结束时间为驾驶结束)- 9:12(开始时间为第一次骑行)= 14:12 - 9:12 = 5 小时 = 300 分钟

TimeDetails Event Action
1 3/1/2018 5:12 Biking start
2 3/1/2018 6:12 Biking end
3 3/1/2018 7:12 Driving start
4 3/1/2018 8:12 Driving end
5 3/2/2018 9:12 Biking start
6 3/2/2018 10:12 Biking end
7 3/2/2018 11:12 Biking start
8 3/2/2018 12:12 Biking end
9 3/2/2018 13:12 Driving start
9 3/2/2018 14:12 Driving end

下面是我的一些代码:

biking_df = df.filter(df.Event == 'Biking' & (df.Action == 'start'))
driving_df = df.filter(df.Event == 'Driving' & (df.Action == 'end'))

有人可以向我提供 PySpark 中的一些代码吗? 非常感谢

您首先必须确定并 select 包含持续时间开始和结束时间的相关行。使用 Window 函数和 lag 您可以将日期放入同一行并计算持续时间:

from pyspark.sql import Window, functions as F

time_order_window = Window.orderBy('TimeDetails')

(
    df
    .withColumn('previous_event', F.lag('Event').over(time_order_window)
    .where(
        (F.col('Event') == 'Driving' & F.col('Action') == 'end') |
        (F.col('Event') == 'Biking' & F.col('previous_event') == 'Driving')
    )
    .withColumn('previous_time', F.lag('TimeDetails').over(time_order_window))
    .withColumn('duration_in_sec', (F.col('TimeDetails').cast('double') - F.col('previous_time').cast('double')))
    .withColumn('duration_in_min', F.col('duration_in_sec') / 60
    .where(F.col('Event') == 'Driving')
)

这假设在一个或多个自行车事件之间始终只有一个驾驶事件(两行)。