如何使用 PySpark 处理以下数据?
How to use PySpark to dealing with below data?
假设我们在 PySpark 中定义了一个 DataFrame(df)。以及如何使用 PySpark 在 运行 动作之后立即获取骑车动作之间的持续时间。并将结果保存到 datefrmaework 中,包括 running_timedeatails、运行、biking_timedeatails、骑自行车、持续时间等
以下两个持续时间符合 2018 年 3 月 1 日的日期,因为两个动作都在同一日期内,并且骑车动作在 运行 动作之后立即发生。 (假设只有两个动作)
10:12(骑行时间)- 9:12(运行 时间)= 1 小时
13:12(骑行时间)- 12:12(跑步时间)= 1 小时
下面的 df 示例:
timedeatils
actions
3/1/18 5:12
Biking
3/1/18 6:12
Running
3/1/18 7:12
Running
3/1/18 8:12
Running
3/1/18 9:12
Running
3/1/18 10:12
Biking
3/1/18 11:12
Biking
3/1/18 12:12
Running
3/1/18 13:12
Biking
3/2/18 4:12
Biking
3/2/18 5:12
Biking
3/2/18 6:12
Running
3/2/18 7:12
Biking
3/2/18 8:12
Running
3/3/18 4:16
Biking
3/4/18 5:13
Running
3/4/18 6:13
Running
3/4/18 7:13
Running
3/4/18 8:13
Running
3/4/18 9:13
Running
3/4/18 10:13
Running
3/4/18 11:13
Biking
下面是我的一些代码:
df = spark.createDataFrame(
[
(3/1/2018 5:12','Biking')
,(3/1/2018 6:12',Running)
,(3/1/2018 7:12',Running)
,(3/1/2018 8:12',Running)
,(3/1/2018 9:12',Running)
,(3/1/2018 10:12','Biking')
,(3/1/2018 11:12','Biking')
,(3/1/2018 12:12',Running)
,(3/1/2018 13:12','Biking')
,(3/2/2018 4:12','Biking')
,(3/2/2018 5:12','Biking')
,(3/2/2018 6:12',Running)
,(3/2/2018 7:12','Biking')
,(3/2/2018 8:12',Running)
,(3/3/2018 4:16','Biking')
,(3/4/2018 5:13','Biking')
,(3/4/2018 6:13',Running)
,(3/4/2018 7:13',Running)
,(3/4/2018 8:13',Running)
,(3/4/2018 9:13',Running)
,(3/4/2018 10:13',Running)
,(3/4/2018 11:12',Biking)
], ['TimeDetails','Actions']
)
我们只关心“运行 动作之后的第一个骑车动作”之间的持续时间。基本上,duration = Biking_timestamp - running_timestamp(只考虑骑车发生在运行事件之后,骑车应该是在运行事件之后的第一个骑车动作)
备注:
- 两个事件应该在同一天发生
- 喜欢日期“3/3/18 4:16“骑自行车”我们不会与前一个日期“运行”进行比较;因此对于 3/3/18 4:16 , 不会有这样的持续时间
示例输出:
Biking_time
action_1
Running_time
action_2
Durations_in_Hour
1
3/1/18 10:12
Biking
3/1/18 9:12
Running
1
2
3/1/18 13:12
Biking
3/1/18 13:12
Running
1
3
3/2/18 7:12
Biking
3/2/18 6:12
Running
1
4
3/4/18 11:13
Biking
3/4/18 10:13
Running
1
有人可以向我提供 PySpark 中的一些代码吗?非常感谢
您的数据:
df = spark.createDataFrame(
[
('3/1/2018 5:12','Biking')
,('3/1/2018 6:12','Running')
,('3/1/2018 7:12','Running')
,('3/1/2018 8:12','Running')
,('3/1/2018 9:12','Running')
,('3/1/2018 10:12','Biking')
,('3/1/2018 11:12','Biking')
,('3/1/2018 12:12','Running')
,('3/1/2018 13:12','Biking')
,('3/2/2018 4:12','Biking')
,('3/2/2018 5:12','Biking')
,('3/2/2018 6:12','Running')
,('3/2/2018 7:12','Biking')
,('3/2/2018 8:12','Running')
,('3/3/2018 4:16','Biking')
,('3/4/2018 5:13','Biking')
,('3/4/2018 6:13','Running')
,('3/4/2018 7:13','Running')
,('3/4/2018 8:13','Running')
,('3/4/2018 9:13','Running')
,('3/4/2018 10:13','Running')
,('3/4/2018 11:12','Biking')
], ['TimeDetails','Actions']
).withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))
使用Window函数的解决方案:
from pyspark.sql import functions as F, types as T
from pyspark.sql import Window as W
w = W.partitionBy('date').orderBy(F.asc("TimeDetails"))
df = df\
.withColumn('date', F.to_date('TimeDetails'))\
.withColumn("Prev_Action", F.lag(F.col('Actions')).over(w))\
.withColumn("Prev_Action_ts", F.lag(F.col('TimeDetails')).over(w))\
.withColumn('Bike_after_Run?', F.when((col('Actions')=='Biking') & (col('Prev_Action')=='Running'),1).otherwise(0))\
.withColumn('Durations_in_Hours',(F.unix_timestamp("TimeDetails") - F.unix_timestamp('Prev_Action_ts'))/3600)\
.withColumn('Durations_in_Hours', F.round(F.col('Durations_in_Hours'),0).cast(T.IntegerType()))\
.filter(col('Bike_after_Run?') == 1)\
.select(F.col('TimeDetails').alias('Biking_time'),
F.col('Actions').alias('action_1'),
F.col('Prev_Action_ts').alias('Running_time'),
F.col('Prev_Action').alias('action_2'),
'Durations_in_Hours')\
.orderBy('TimeDetails')
df.show()
+-------------------+--------+-------------------+--------+------------------+
|Biking_time |action_1|Running_time |action_2|Durations_in_Hours|
+-------------------+--------+-------------------+--------+------------------+
|2018-03-01 10:12:00|Biking |2018-03-01 09:12:00|Running |1 |
|2018-03-01 13:12:00|Biking |2018-03-01 12:12:00|Running |1 |
|2018-03-02 07:12:00|Biking |2018-03-02 06:12:00|Running |1 |
|2018-03-04 11:12:00|Biking |2018-03-04 10:13:00|Running |1 |
+-------------------+--------+-------------------+--------+------------------+
假设我们在 PySpark 中定义了一个 DataFrame(df)。以及如何使用 PySpark 在 运行 动作之后立即获取骑车动作之间的持续时间。并将结果保存到 datefrmaework 中,包括 running_timedeatails、运行、biking_timedeatails、骑自行车、持续时间等
以下两个持续时间符合 2018 年 3 月 1 日的日期,因为两个动作都在同一日期内,并且骑车动作在 运行 动作之后立即发生。 (假设只有两个动作)
10:12(骑行时间)- 9:12(运行 时间)= 1 小时
13:12(骑行时间)- 12:12(跑步时间)= 1 小时
下面的 df 示例:
timedeatils | actions | |
---|---|---|
3/1/18 5:12 | Biking | |
3/1/18 6:12 | Running | |
3/1/18 7:12 | Running | |
3/1/18 8:12 | Running | |
3/1/18 9:12 | Running | |
3/1/18 10:12 | Biking | |
3/1/18 11:12 | Biking | |
3/1/18 12:12 | Running | |
3/1/18 13:12 | Biking | |
3/2/18 4:12 | Biking | |
3/2/18 5:12 | Biking | |
3/2/18 6:12 | Running | |
3/2/18 7:12 | Biking | |
3/2/18 8:12 | Running | |
3/3/18 4:16 | Biking | |
3/4/18 5:13 | Running | |
3/4/18 6:13 | Running | |
3/4/18 7:13 | Running | |
3/4/18 8:13 | Running | |
3/4/18 9:13 | Running | |
3/4/18 10:13 | Running | |
3/4/18 11:13 | Biking |
下面是我的一些代码:
df = spark.createDataFrame(
[
(3/1/2018 5:12','Biking')
,(3/1/2018 6:12',Running)
,(3/1/2018 7:12',Running)
,(3/1/2018 8:12',Running)
,(3/1/2018 9:12',Running)
,(3/1/2018 10:12','Biking')
,(3/1/2018 11:12','Biking')
,(3/1/2018 12:12',Running)
,(3/1/2018 13:12','Biking')
,(3/2/2018 4:12','Biking')
,(3/2/2018 5:12','Biking')
,(3/2/2018 6:12',Running)
,(3/2/2018 7:12','Biking')
,(3/2/2018 8:12',Running)
,(3/3/2018 4:16','Biking')
,(3/4/2018 5:13','Biking')
,(3/4/2018 6:13',Running)
,(3/4/2018 7:13',Running)
,(3/4/2018 8:13',Running)
,(3/4/2018 9:13',Running)
,(3/4/2018 10:13',Running)
,(3/4/2018 11:12',Biking)
], ['TimeDetails','Actions']
)
我们只关心“运行 动作之后的第一个骑车动作”之间的持续时间。基本上,duration = Biking_timestamp - running_timestamp(只考虑骑车发生在运行事件之后,骑车应该是在运行事件之后的第一个骑车动作)
备注:
- 两个事件应该在同一天发生
- 喜欢日期“3/3/18 4:16“骑自行车”我们不会与前一个日期“运行”进行比较;因此对于 3/3/18 4:16 , 不会有这样的持续时间
示例输出:
Biking_time | action_1 | Running_time | action_2 | Durations_in_Hour | |
---|---|---|---|---|---|
1 | 3/1/18 10:12 | Biking | 3/1/18 9:12 | Running | 1 |
2 | 3/1/18 13:12 | Biking | 3/1/18 13:12 | Running | 1 |
3 | 3/2/18 7:12 | Biking | 3/2/18 6:12 | Running | 1 |
4 | 3/4/18 11:13 | Biking | 3/4/18 10:13 | Running | 1 |
有人可以向我提供 PySpark 中的一些代码吗?非常感谢
您的数据:
df = spark.createDataFrame(
[
('3/1/2018 5:12','Biking')
,('3/1/2018 6:12','Running')
,('3/1/2018 7:12','Running')
,('3/1/2018 8:12','Running')
,('3/1/2018 9:12','Running')
,('3/1/2018 10:12','Biking')
,('3/1/2018 11:12','Biking')
,('3/1/2018 12:12','Running')
,('3/1/2018 13:12','Biking')
,('3/2/2018 4:12','Biking')
,('3/2/2018 5:12','Biking')
,('3/2/2018 6:12','Running')
,('3/2/2018 7:12','Biking')
,('3/2/2018 8:12','Running')
,('3/3/2018 4:16','Biking')
,('3/4/2018 5:13','Biking')
,('3/4/2018 6:13','Running')
,('3/4/2018 7:13','Running')
,('3/4/2018 8:13','Running')
,('3/4/2018 9:13','Running')
,('3/4/2018 10:13','Running')
,('3/4/2018 11:12','Biking')
], ['TimeDetails','Actions']
).withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))
使用Window函数的解决方案:
from pyspark.sql import functions as F, types as T
from pyspark.sql import Window as W
w = W.partitionBy('date').orderBy(F.asc("TimeDetails"))
df = df\
.withColumn('date', F.to_date('TimeDetails'))\
.withColumn("Prev_Action", F.lag(F.col('Actions')).over(w))\
.withColumn("Prev_Action_ts", F.lag(F.col('TimeDetails')).over(w))\
.withColumn('Bike_after_Run?', F.when((col('Actions')=='Biking') & (col('Prev_Action')=='Running'),1).otherwise(0))\
.withColumn('Durations_in_Hours',(F.unix_timestamp("TimeDetails") - F.unix_timestamp('Prev_Action_ts'))/3600)\
.withColumn('Durations_in_Hours', F.round(F.col('Durations_in_Hours'),0).cast(T.IntegerType()))\
.filter(col('Bike_after_Run?') == 1)\
.select(F.col('TimeDetails').alias('Biking_time'),
F.col('Actions').alias('action_1'),
F.col('Prev_Action_ts').alias('Running_time'),
F.col('Prev_Action').alias('action_2'),
'Durations_in_Hours')\
.orderBy('TimeDetails')
df.show()
+-------------------+--------+-------------------+--------+------------------+
|Biking_time |action_1|Running_time |action_2|Durations_in_Hours|
+-------------------+--------+-------------------+--------+------------------+
|2018-03-01 10:12:00|Biking |2018-03-01 09:12:00|Running |1 |
|2018-03-01 13:12:00|Biking |2018-03-01 12:12:00|Running |1 |
|2018-03-02 07:12:00|Biking |2018-03-02 06:12:00|Running |1 |
|2018-03-04 11:12:00|Biking |2018-03-04 10:13:00|Running |1 |
+-------------------+--------+-------------------+--------+------------------+