如何计算pyspark数据框中特定事件的总持续时间?
How to calculate the total duration of specific event in pyspark dataframe?
假设我们下面有一个 CSV 文件,并且已经在 pyspark 中定义了 schema/DataFrame (test_data)。以及,如何使用 spark SQL(PySpark) 获取所有驾驶事件的总持续时间(以分钟为单位)?
对于下表,总持续时间应为:(6:12 - 5:12
) + (8:12 - 7:12
) = 2 小时 = 120 分钟。
下面是我的一些初始化代码:(如果我不应该使用下面的代码,请纠正我)
df.createOrReplaceTempView("test_data")
df2 = spark.sql("SELECT * from test_data")
TimeDetails
Event
Value
1
3/1/18 5:12
Driving
start
2
3/1/18 6:12
Driving
end
3
3/1/18 7:12
Driving
start
4
3/1/18 8:12
Driving
end
5
3/1/18 9:12
Biking
start
6
3/1/18 10:12
Biking
end
7
3/1/18 11:12
Biking
start
8
3/1/18 0:12
Biking
end
….
有人可以向我提供 PySpark SQL 中的一些代码吗?
谢谢
您需要识别每个驾驶会话,然后分组并减去 end - start
时间戳以找到每个会话的持续时间。最后,将所有持续时间相加以获得总驾驶时间。
- 根据事件过滤数据框
driving
并使用累积条件和创建 session_id
列:
from pyspark.sql import functions as F, Window
# convert TimeDetails column into timestamp if it's not already done
df = df.withColumn("TimeDetails", F.to_timestamp("TimeDetails", "d/M/yy H:mm"))
w = Window.orderBy("TimeDetails")
df = df.filter("Event = 'Driving'").withColumn(
"session_id",
F.sum(F.when(F.col("Value") == "start", 1).otherwise(0)).over(w)
)
df.show()
#+-------------------+-------+-----+----------+
#| TimeDetails| Event|Value|session_id|
#+-------------------+-------+-----+----------+
#|2018-01-03 05:12:00|Driving|start| 1|
#|2018-01-03 06:12:00|Driving| end| 1|
#|2018-01-03 07:12:00|Driving|start| 2|
#|2018-01-03 08:12:00|Driving| end| 2|
#+-------------------+-------+-----+----------+
- 现在按
session_id
分组计算会话持续时间和总和:
result = df.groupBy("session_id").agg(
(
(F.max("TimeDetails").cast("long") - F.min("TimeDetails").cast("long")) / 60
).alias("session_duration")
).select(
F.sum("session_duration").alias("total_drive_duration")
)
result.show()
#+--------------------+
#|total_drive_duration|
#+--------------------+
#| 120.0|
#+--------------------+
假设我们下面有一个 CSV 文件,并且已经在 pyspark 中定义了 schema/DataFrame (test_data)。以及,如何使用 spark SQL(PySpark) 获取所有驾驶事件的总持续时间(以分钟为单位)?
对于下表,总持续时间应为:(6:12 - 5:12
) + (8:12 - 7:12
) = 2 小时 = 120 分钟。
下面是我的一些初始化代码:(如果我不应该使用下面的代码,请纠正我)
df.createOrReplaceTempView("test_data")
df2 = spark.sql("SELECT * from test_data")
TimeDetails | Event | Value | |
---|---|---|---|
1 | 3/1/18 5:12 | Driving | start |
2 | 3/1/18 6:12 | Driving | end |
3 | 3/1/18 7:12 | Driving | start |
4 | 3/1/18 8:12 | Driving | end |
5 | 3/1/18 9:12 | Biking | start |
6 | 3/1/18 10:12 | Biking | end |
7 | 3/1/18 11:12 | Biking | start |
8 | 3/1/18 0:12 | Biking | end |
…. |
有人可以向我提供 PySpark SQL 中的一些代码吗? 谢谢
您需要识别每个驾驶会话,然后分组并减去 end - start
时间戳以找到每个会话的持续时间。最后,将所有持续时间相加以获得总驾驶时间。
- 根据事件过滤数据框
driving
并使用累积条件和创建session_id
列:
from pyspark.sql import functions as F, Window
# convert TimeDetails column into timestamp if it's not already done
df = df.withColumn("TimeDetails", F.to_timestamp("TimeDetails", "d/M/yy H:mm"))
w = Window.orderBy("TimeDetails")
df = df.filter("Event = 'Driving'").withColumn(
"session_id",
F.sum(F.when(F.col("Value") == "start", 1).otherwise(0)).over(w)
)
df.show()
#+-------------------+-------+-----+----------+
#| TimeDetails| Event|Value|session_id|
#+-------------------+-------+-----+----------+
#|2018-01-03 05:12:00|Driving|start| 1|
#|2018-01-03 06:12:00|Driving| end| 1|
#|2018-01-03 07:12:00|Driving|start| 2|
#|2018-01-03 08:12:00|Driving| end| 2|
#+-------------------+-------+-----+----------+
- 现在按
session_id
分组计算会话持续时间和总和:
result = df.groupBy("session_id").agg(
(
(F.max("TimeDetails").cast("long") - F.min("TimeDetails").cast("long")) / 60
).alias("session_duration")
).select(
F.sum("session_duration").alias("total_drive_duration")
)
result.show()
#+--------------------+
#|total_drive_duration|
#+--------------------+
#| 120.0|
#+--------------------+