如何计算pyspark数据框中特定事件的总持续时间?

How to calculate the total duration of specific event in pyspark dataframe?

假设我们下面有一个 CSV 文件,并且已经在 pyspark 中定义了 schema/DataFrame (test_data)。以及,如何使用 spark SQL(PySpark) 获取所有驾驶事件的总持续时间(以分钟为单位)?

对于下表,总持续时间应为:(6:12 - 5:12) + (8:12 - 7:12) = 2 小时 = 120 分钟。

下面是我的一些初始化代码:(如果我不应该使用下面的代码,请纠正我)

df.createOrReplaceTempView("test_data")
df2 = spark.sql("SELECT * from test_data")
TimeDetails Event Value
1 3/1/18 5:12 Driving start
2 3/1/18 6:12 Driving end
3 3/1/18 7:12 Driving start
4 3/1/18 8:12 Driving end
5 3/1/18 9:12 Biking start
6 3/1/18 10:12 Biking end
7 3/1/18 11:12 Biking start
8 3/1/18 0:12 Biking end
….

有人可以向我提供 PySpark SQL 中的一些代码吗? 谢谢

您需要识别每个驾驶会话,然后分组并减去 end - start 时间戳以找到每个会话的持续时间。最后,将所有持续时间相加以获得总驾驶时间。

  • 根据事件过滤数据框 driving 并使用累积条件和创建 session_id 列:
from pyspark.sql import functions as F, Window

# convert TimeDetails column into timestamp if it's not already done
df = df.withColumn("TimeDetails", F.to_timestamp("TimeDetails", "d/M/yy H:mm"))

w = Window.orderBy("TimeDetails")

df = df.filter("Event = 'Driving'").withColumn(
    "session_id",
    F.sum(F.when(F.col("Value") == "start", 1).otherwise(0)).over(w)
)

df.show()
#+-------------------+-------+-----+----------+
#|        TimeDetails|  Event|Value|session_id|
#+-------------------+-------+-----+----------+
#|2018-01-03 05:12:00|Driving|start|         1|
#|2018-01-03 06:12:00|Driving|  end|         1|
#|2018-01-03 07:12:00|Driving|start|         2|
#|2018-01-03 08:12:00|Driving|  end|         2|
#+-------------------+-------+-----+----------+
  • 现在按session_id分组计算会话持续时间和总和:
result = df.groupBy("session_id").agg(
    (
      (F.max("TimeDetails").cast("long") - F.min("TimeDetails").cast("long")) / 60
    ).alias("session_duration")
).select(
    F.sum("session_duration").alias("total_drive_duration")
)

result.show()
#+--------------------+
#|total_drive_duration|
#+--------------------+
#|               120.0|
#+--------------------+