如何使用 PySpark 处理以下数据?

How to use PySpark to dealing with below data?

假设我们在 PySpark 中定义了一个 DataFrame(df)。以及如何使用 PySpark 在 运行 动作之后立即获取骑车动作之间的持续时间。并将结果保存到 datefrmaework 中,包括 running_timedeatails、运行、biking_timedeatails、骑自行车、持续时间等

以下两个持续时间符合 2018 年 3 月 1 日的日期,因为两个动作都在同一日期内,并且骑车动作在 运行 动作之后立即发生。 (假设只有两个动作)

10:12(骑行时间)- 9:12(运行 时间)= 1 小时

13:12(骑行时间)- 12:12(跑步时间)= 1 小时

下面的 df 示例:

timedeatils actions
3/1/18 5:12 Biking
3/1/18 6:12 Running
3/1/18 7:12 Running
3/1/18 8:12 Running
3/1/18 9:12 Running
3/1/18 10:12 Biking
3/1/18 11:12 Biking
3/1/18 12:12 Running
3/1/18 13:12 Biking
3/2/18 4:12 Biking
3/2/18 5:12 Biking
3/2/18 6:12 Running
3/2/18 7:12 Biking
3/2/18 8:12 Running
3/3/18 4:16 Biking
3/4/18 5:13 Running
3/4/18 6:13 Running
3/4/18 7:13 Running
3/4/18 8:13 Running
3/4/18 9:13 Running
3/4/18 10:13 Running
3/4/18 11:13 Biking

下面是我的一些代码:

 df  = spark.createDataFrame(
      [
    (3/1/2018 5:12','Biking')
    ,(3/1/2018 6:12',Running)
    ,(3/1/2018 7:12',Running)
    ,(3/1/2018 8:12',Running)
    ,(3/1/2018 9:12',Running)
    ,(3/1/2018 10:12','Biking')
    ,(3/1/2018 11:12','Biking')
    ,(3/1/2018 12:12',Running)
    ,(3/1/2018 13:12','Biking')
    ,(3/2/2018 4:12','Biking')
    ,(3/2/2018 5:12','Biking')
    ,(3/2/2018 6:12',Running)
    ,(3/2/2018 7:12','Biking')
    ,(3/2/2018 8:12',Running)
    ,(3/3/2018 4:16','Biking')
    ,(3/4/2018 5:13','Biking')
    ,(3/4/2018 6:13',Running)
    ,(3/4/2018 7:13',Running)
    ,(3/4/2018 8:13',Running)
    ,(3/4/2018 9:13',Running)
    ,(3/4/2018 10:13',Running)
    ,(3/4/2018 11:12',Biking)
      ], ['TimeDetails','Actions']
    )

我们只关心“运行 动作之后的第一个骑车动作”之间的持续时间。基本上,duration = Biking_timestamp - running_timestamp(只考虑骑车发生在运行事件之后,骑车应该是在运行事件之后的第一个骑车动作)

备注:

  1. 两个事件应该在同一天发生
  2. 喜欢日期“3/3/18 4:16“骑自行车”我们不会与前一个日期“运行”进行比较;因此对于 3/3/18 4:16 , 不会有这样的持续时间

示例输出:

Biking_time action_1 Running_time action_2 Durations_in_Hour
1 3/1/18 10:12 Biking 3/1/18 9:12 Running 1
2 3/1/18 13:12 Biking 3/1/18 13:12 Running 1
3 3/2/18 7:12 Biking 3/2/18 6:12 Running 1
4 3/4/18 11:13 Biking 3/4/18 10:13 Running 1

有人可以向我提供 PySpark 中的一些代码吗?非常感谢

您的数据:

 df  = spark.createDataFrame(
      [
    ('3/1/2018 5:12','Biking')
    ,('3/1/2018 6:12','Running')
    ,('3/1/2018 7:12','Running')
    ,('3/1/2018 8:12','Running')
    ,('3/1/2018 9:12','Running')
    ,('3/1/2018 10:12','Biking')
    ,('3/1/2018 11:12','Biking')
    ,('3/1/2018 12:12','Running')
    ,('3/1/2018 13:12','Biking')
    ,('3/2/2018 4:12','Biking')
    ,('3/2/2018 5:12','Biking')
    ,('3/2/2018 6:12','Running')
    ,('3/2/2018 7:12','Biking')
    ,('3/2/2018 8:12','Running')
    ,('3/3/2018 4:16','Biking')
    ,('3/4/2018 5:13','Biking')
    ,('3/4/2018 6:13','Running')
    ,('3/4/2018 7:13','Running')
    ,('3/4/2018 8:13','Running')
    ,('3/4/2018 9:13','Running')
    ,('3/4/2018 10:13','Running')
    ,('3/4/2018 11:12','Biking')
      ], ['TimeDetails','Actions']
    ).withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))

使用Window函数的解决方案:

from pyspark.sql import functions as F, types as T
from pyspark.sql import Window as W


w = W.partitionBy('date').orderBy(F.asc("TimeDetails"))

df = df\
    .withColumn('date', F.to_date('TimeDetails'))\
    .withColumn("Prev_Action", F.lag(F.col('Actions')).over(w))\
    .withColumn("Prev_Action_ts", F.lag(F.col('TimeDetails')).over(w))\
    .withColumn('Bike_after_Run?', F.when((col('Actions')=='Biking') & (col('Prev_Action')=='Running'),1).otherwise(0))\
    .withColumn('Durations_in_Hours',(F.unix_timestamp("TimeDetails") - F.unix_timestamp('Prev_Action_ts'))/3600)\
    .withColumn('Durations_in_Hours', F.round(F.col('Durations_in_Hours'),0).cast(T.IntegerType()))\
    .filter(col('Bike_after_Run?') == 1)\
    .select(F.col('TimeDetails').alias('Biking_time'),
            F.col('Actions').alias('action_1'),
            F.col('Prev_Action_ts').alias('Running_time'),
            F.col('Prev_Action').alias('action_2'),
            'Durations_in_Hours')\
    .orderBy('TimeDetails')

df.show()
+-------------------+--------+-------------------+--------+------------------+
|Biking_time        |action_1|Running_time       |action_2|Durations_in_Hours|
+-------------------+--------+-------------------+--------+------------------+
|2018-03-01 10:12:00|Biking  |2018-03-01 09:12:00|Running |1                 |
|2018-03-01 13:12:00|Biking  |2018-03-01 12:12:00|Running |1                 |
|2018-03-02 07:12:00|Biking  |2018-03-02 06:12:00|Running |1                 |
|2018-03-04 11:12:00|Biking  |2018-03-04 10:13:00|Running |1                 |
+-------------------+--------+-------------------+--------+------------------+