如何使用 PySpark 来处理低于持续时间的问题?
how to use PySpark to handle get below durations?
假设我们在 PySpark 中定义了一个 DataFrame(df)。以及如何使用 PySpark 获取同一天内第一次骑车动作和最后一次骑车动作之间的持续时间。并将结果保存到日期框架中,包括 first_biking_timedeatails、last_biking_timedeatails、durations_bewteween_first_last 等。注意:第一个和最后一个骑车动作之间可以有其他动作。而且,如果一天内只有一个骑车动作,那么我们不应该得到持续时间(因为我们将无法进行计算,例如日期 3/3/18)
以下是日期 3/01/2018 的示例结果:
duration_03_01 = 13:12(最后一次骑行时间)- 5:12(第一次骑行时间)= 8 小时
下面的 df 示例:
timedeatils
actions
3/1/18 5:12
Biking
3/1/18 6:12
Running
3/1/18 7:12
Swimming
3/1/18 8:12
Running
3/1/18 9:12
Swimming
3/1/18 10:12
Biking
3/1/18 11:12
Biking
3/1/18 12:12
Running
3/1/18 13:12
Biking
3/2/18 4:12
Biking
3/2/18 5:12
Swimming
3/2/18 6:12
Running
3/2/18 7:12
Biking
3/2/18 8:12
Running
3/3/18 4:16
Biking
3/4/18 5:13
Running
3/4/18 6:13
Biking
3/4/18 7:13
Running
3/4/18 8:13
Swimming
3/4/18 9:13
Running
3/4/18 10:13
Running
3/4/18 11:13
Biking
我的一些代码
df = spark.createDataFrame(
[
(3/1/2018 5:12','Biking')
,(3/1/2018 6:12',Running)
,(3/1/2018 7:12',Swimming)
,(3/1/2018 8:12',Running)
,(3/1/2018 9:12',Swimming)
,(3/1/2018 10:12','Biking')
,(3/1/2018 11:12','Biking')
,(3/1/2018 12:12',Running)
,(3/1/2018 13:12','Biking')
,(3/2/2018 4:12','Biking')
,(3/2/2018 5:12',Swimming)
,(3/2/2018 6:12',Running)
,(3/2/2018 7:12','Biking')
,(3/2/2018 8:12',Running)
,(3/3/2018 4:16','Biking')
,(3/4/2018 5:13','Biking')
,(3/4/2018 6:13',Running)
,(3/4/2018 7:13',Running)
,(3/4/2018 8:13',Swimming)
,(3/4/2018 9:13',Running)
,(3/4/2018 10:13',Running)
,(3/4/2018 11:13',Biking)
], ['TimeDetails','Actions']
)
示例输出如下:
First_Biking_time
action_1
Last_Biking_time
action_2
Durations_in_Hour
1
3/1/18 5:12
Biking
3/1/18 13:12
Biking
8
2
3/2/18 4:12
Biking
3/2/18 7:12
Biking
3
3
3/4/18 6:13
Biking
3/4/18 11:13
Biking
5
有人可以向我提供 PySpark 中的一些代码吗?另一方面,有什么方法可以解决 PySpark SQL 中的问题吗?
谢谢
你的 df:
df = spark.createDataFrame(
[
('3/1/2018 5:12','Biking')
,('3/1/2018 6:12','Running')
,('3/1/2018 7:12','Swimming')
,('3/1/2018 8:12','Running')
,('3/1/2018 9:12','Swimming')
,('3/1/2018 10:12','Biking')
,('3/1/2018 11:12','Biking')
,('3/1/2018 12:12','Running')
,('3/1/2018 13:12','Biking')
,('3/2/2018 4:12','Biking')
,('3/2/2018 5:12','Swimming')
,('3/2/2018 6:12','Running')
,('3/2/2018 7:12','Biking')
,('3/2/2018 8:12','Running')
,('3/3/2018 4:16','Biking')
,('3/4/2018 5:13','Biking')
,('3/4/2018 6:13','Running')
,('3/4/2018 7:13','Running')
,('3/4/2018 8:13','Swimming')
,('3/4/2018 9:13','Running')
,('3/4/2018 10:13','Running')
,('3/4/2018 11:13','Biking')
], ['TimeDetails','Actions']
)
使用 window 函数。您也可以将此解决方案应用于其他操作:
from pyspark.sql import functions as F
from pyspark.sql import Window
df = df.\
withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))\
.withColumn('date', F.to_date('TimeDetails'))\
w = Window.partitionBy('Actions', 'date').orderBy("date")
generic = df\
.withColumn('first_record', F.first(F.col('TimeDetails'), ignorenulls=True).over(w))\
.withColumn('last_record', F.last(F.col('TimeDetails'), ignorenulls=True).over(w))\
.withColumn('Durations_in_Hours',(F.unix_timestamp("last_record") - F.unix_timestamp('first_record'))/3600)\
.orderBy('TimeDetails')
biking = generic\
.filter(F.col('Actions') == 'Biking')\
.select(F.col('first_record').alias('First_Biking_time'),
F.col('Actions').alias('action_1'),
F.col('last_record').alias('Last_Biking_time'),
F.col('Actions').alias('action_2'),
F.col('Durations_in_Hours'))\
.dropDuplicates()\
.filter(F.col('Durations_in_Hours') != 0)\
.orderBy('First_Biking_time')
biking.show()
输出:
+-------------------+--------+-------------------+--------+------------------+
| First_Biking_time|action_1| Last_Biking_time|action_2|Durations_in_Hours|
+-------------------+--------+-------------------+--------+------------------+
|2018-03-01 05:12:00| Biking|2018-03-01 13:12:00| Biking| 8.0|
|2018-03-02 04:12:00| Biking|2018-03-02 07:12:00| Biking| 3.0|
|2018-03-04 05:13:00| Biking|2018-03-04 11:13:00| Biking| 6.0|
+-------------------+--------+-------------------+--------+------------------+
假设我们在 PySpark 中定义了一个 DataFrame(df)。以及如何使用 PySpark 获取同一天内第一次骑车动作和最后一次骑车动作之间的持续时间。并将结果保存到日期框架中,包括 first_biking_timedeatails、last_biking_timedeatails、durations_bewteween_first_last 等。注意:第一个和最后一个骑车动作之间可以有其他动作。而且,如果一天内只有一个骑车动作,那么我们不应该得到持续时间(因为我们将无法进行计算,例如日期 3/3/18)
以下是日期 3/01/2018 的示例结果:
duration_03_01 = 13:12(最后一次骑行时间)- 5:12(第一次骑行时间)= 8 小时
下面的 df 示例:
timedeatils | actions | |
---|---|---|
3/1/18 5:12 | Biking | |
3/1/18 6:12 | Running | |
3/1/18 7:12 | Swimming | |
3/1/18 8:12 | Running | |
3/1/18 9:12 | Swimming | |
3/1/18 10:12 | Biking | |
3/1/18 11:12 | Biking | |
3/1/18 12:12 | Running | |
3/1/18 13:12 | Biking | |
3/2/18 4:12 | Biking | |
3/2/18 5:12 | Swimming | |
3/2/18 6:12 | Running | |
3/2/18 7:12 | Biking | |
3/2/18 8:12 | Running | |
3/3/18 4:16 | Biking | |
3/4/18 5:13 | Running | |
3/4/18 6:13 | Biking | |
3/4/18 7:13 | Running | |
3/4/18 8:13 | Swimming | |
3/4/18 9:13 | Running | |
3/4/18 10:13 | Running | |
3/4/18 11:13 | Biking |
我的一些代码
df = spark.createDataFrame(
[
(3/1/2018 5:12','Biking')
,(3/1/2018 6:12',Running)
,(3/1/2018 7:12',Swimming)
,(3/1/2018 8:12',Running)
,(3/1/2018 9:12',Swimming)
,(3/1/2018 10:12','Biking')
,(3/1/2018 11:12','Biking')
,(3/1/2018 12:12',Running)
,(3/1/2018 13:12','Biking')
,(3/2/2018 4:12','Biking')
,(3/2/2018 5:12',Swimming)
,(3/2/2018 6:12',Running)
,(3/2/2018 7:12','Biking')
,(3/2/2018 8:12',Running)
,(3/3/2018 4:16','Biking')
,(3/4/2018 5:13','Biking')
,(3/4/2018 6:13',Running)
,(3/4/2018 7:13',Running)
,(3/4/2018 8:13',Swimming)
,(3/4/2018 9:13',Running)
,(3/4/2018 10:13',Running)
,(3/4/2018 11:13',Biking)
], ['TimeDetails','Actions']
)
示例输出如下:
First_Biking_time | action_1 | Last_Biking_time | action_2 | Durations_in_Hour | |
---|---|---|---|---|---|
1 | 3/1/18 5:12 | Biking | 3/1/18 13:12 | Biking | 8 |
2 | 3/2/18 4:12 | Biking | 3/2/18 7:12 | Biking | 3 |
3 | 3/4/18 6:13 | Biking | 3/4/18 11:13 | Biking | 5 |
有人可以向我提供 PySpark 中的一些代码吗?另一方面,有什么方法可以解决 PySpark SQL 中的问题吗?
谢谢
你的 df:
df = spark.createDataFrame(
[
('3/1/2018 5:12','Biking')
,('3/1/2018 6:12','Running')
,('3/1/2018 7:12','Swimming')
,('3/1/2018 8:12','Running')
,('3/1/2018 9:12','Swimming')
,('3/1/2018 10:12','Biking')
,('3/1/2018 11:12','Biking')
,('3/1/2018 12:12','Running')
,('3/1/2018 13:12','Biking')
,('3/2/2018 4:12','Biking')
,('3/2/2018 5:12','Swimming')
,('3/2/2018 6:12','Running')
,('3/2/2018 7:12','Biking')
,('3/2/2018 8:12','Running')
,('3/3/2018 4:16','Biking')
,('3/4/2018 5:13','Biking')
,('3/4/2018 6:13','Running')
,('3/4/2018 7:13','Running')
,('3/4/2018 8:13','Swimming')
,('3/4/2018 9:13','Running')
,('3/4/2018 10:13','Running')
,('3/4/2018 11:13','Biking')
], ['TimeDetails','Actions']
)
使用 window 函数。您也可以将此解决方案应用于其他操作:
from pyspark.sql import functions as F
from pyspark.sql import Window
df = df.\
withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))\
.withColumn('date', F.to_date('TimeDetails'))\
w = Window.partitionBy('Actions', 'date').orderBy("date")
generic = df\
.withColumn('first_record', F.first(F.col('TimeDetails'), ignorenulls=True).over(w))\
.withColumn('last_record', F.last(F.col('TimeDetails'), ignorenulls=True).over(w))\
.withColumn('Durations_in_Hours',(F.unix_timestamp("last_record") - F.unix_timestamp('first_record'))/3600)\
.orderBy('TimeDetails')
biking = generic\
.filter(F.col('Actions') == 'Biking')\
.select(F.col('first_record').alias('First_Biking_time'),
F.col('Actions').alias('action_1'),
F.col('last_record').alias('Last_Biking_time'),
F.col('Actions').alias('action_2'),
F.col('Durations_in_Hours'))\
.dropDuplicates()\
.filter(F.col('Durations_in_Hours') != 0)\
.orderBy('First_Biking_time')
biking.show()
输出:
+-------------------+--------+-------------------+--------+------------------+
| First_Biking_time|action_1| Last_Biking_time|action_2|Durations_in_Hours|
+-------------------+--------+-------------------+--------+------------------+
|2018-03-01 05:12:00| Biking|2018-03-01 13:12:00| Biking| 8.0|
|2018-03-02 04:12:00| Biking|2018-03-02 07:12:00| Biking| 3.0|
|2018-03-04 05:13:00| Biking|2018-03-04 11:13:00| Biking| 6.0|
+-------------------+--------+-------------------+--------+------------------+