如何使用 PySpark 来处理低于持续时间的问题?

how to use PySpark to handle get below durations?

假设我们在 PySpark 中定义了一个 DataFrame(df)。以及如何使用 PySpark 获取同一天内第一次骑车动作和最后一次骑车动作之间的持续时间。并将结果保存到日期框架中,包括 first_biking_timedeatails、last_biking_timedeatails、durations_bewteween_first_last 等。注意:第一个和最后一个骑车动作之间可以有其他动作。而且,如果一天内只有一个骑车动作,那么我们不应该得到持续时间(因为我们将无法进行计算,例如日期 3/3/18)

以下是日期 3/01/2018 的示例结果:

duration_03_01 = 13:12(最后一次骑行时间)- 5:12(第一次骑行时间)= 8 小时

下面的 df 示例:

timedeatils actions
3/1/18 5:12 Biking
3/1/18 6:12 Running
3/1/18 7:12 Swimming
3/1/18 8:12 Running
3/1/18 9:12 Swimming
3/1/18 10:12 Biking
3/1/18 11:12 Biking
3/1/18 12:12 Running
3/1/18 13:12 Biking
3/2/18 4:12 Biking
3/2/18 5:12 Swimming
3/2/18 6:12 Running
3/2/18 7:12 Biking
3/2/18 8:12 Running
3/3/18 4:16 Biking
3/4/18 5:13 Running
3/4/18 6:13 Biking
3/4/18 7:13 Running
3/4/18 8:13 Swimming
3/4/18 9:13 Running
3/4/18 10:13 Running
3/4/18 11:13 Biking

我的一些代码

df  = spark.createDataFrame(
      [
    (3/1/2018 5:12','Biking')
    ,(3/1/2018 6:12',Running)
    ,(3/1/2018 7:12',Swimming)
    ,(3/1/2018 8:12',Running)
    ,(3/1/2018 9:12',Swimming)
    ,(3/1/2018 10:12','Biking')
    ,(3/1/2018 11:12','Biking')
    ,(3/1/2018 12:12',Running)
    ,(3/1/2018 13:12','Biking')
    ,(3/2/2018 4:12','Biking')
    ,(3/2/2018 5:12',Swimming)
    ,(3/2/2018 6:12',Running)
    ,(3/2/2018 7:12','Biking')
    ,(3/2/2018 8:12',Running)
    ,(3/3/2018 4:16','Biking')
    ,(3/4/2018 5:13','Biking')
    ,(3/4/2018 6:13',Running)
    ,(3/4/2018 7:13',Running)
    ,(3/4/2018 8:13',Swimming)
    ,(3/4/2018 9:13',Running)
    ,(3/4/2018 10:13',Running)
    ,(3/4/2018 11:13',Biking)
      ], ['TimeDetails','Actions']
    )

示例输出如下:

First_Biking_time action_1 Last_Biking_time action_2 Durations_in_Hour
1 3/1/18 5:12 Biking 3/1/18 13:12 Biking 8
2 3/2/18 4:12 Biking 3/2/18 7:12 Biking 3
3 3/4/18 6:13 Biking 3/4/18 11:13 Biking 5

有人可以向我提供 PySpark 中的一些代码吗?另一方面,有什么方法可以解决 PySpark SQL 中的问题吗?

谢谢

你的 df:

df  = spark.createDataFrame(
      [
    ('3/1/2018 5:12','Biking')
    ,('3/1/2018 6:12','Running')
    ,('3/1/2018 7:12','Swimming')
    ,('3/1/2018 8:12','Running')
    ,('3/1/2018 9:12','Swimming')
    ,('3/1/2018 10:12','Biking')
    ,('3/1/2018 11:12','Biking')
    ,('3/1/2018 12:12','Running')
    ,('3/1/2018 13:12','Biking')
    ,('3/2/2018 4:12','Biking')
    ,('3/2/2018 5:12','Swimming')
    ,('3/2/2018 6:12','Running')
    ,('3/2/2018 7:12','Biking')
    ,('3/2/2018 8:12','Running')
    ,('3/3/2018 4:16','Biking')
    ,('3/4/2018 5:13','Biking')
    ,('3/4/2018 6:13','Running')
    ,('3/4/2018 7:13','Running')
    ,('3/4/2018 8:13','Swimming')
    ,('3/4/2018 9:13','Running')
    ,('3/4/2018 10:13','Running')
    ,('3/4/2018 11:13','Biking')
      ], ['TimeDetails','Actions']
    )

使用 window 函数。您也可以将此解决方案应用于其他操作:

from pyspark.sql import functions as F
from pyspark.sql import Window

df = df.\
    withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))\
    .withColumn('date', F.to_date('TimeDetails'))\

w = Window.partitionBy('Actions', 'date').orderBy("date")

generic = df\
            .withColumn('first_record', F.first(F.col('TimeDetails'), ignorenulls=True).over(w))\
            .withColumn('last_record', F.last(F.col('TimeDetails'), ignorenulls=True).over(w))\
            .withColumn('Durations_in_Hours',(F.unix_timestamp("last_record") - F.unix_timestamp('first_record'))/3600)\
            .orderBy('TimeDetails')

biking = generic\
            .filter(F.col('Actions') == 'Biking')\
            .select(F.col('first_record').alias('First_Biking_time'),
                    F.col('Actions').alias('action_1'),
                    F.col('last_record').alias('Last_Biking_time'),
                    F.col('Actions').alias('action_2'),
                    F.col('Durations_in_Hours'))\
            .dropDuplicates()\
            .filter(F.col('Durations_in_Hours') != 0)\
            .orderBy('First_Biking_time')
                

biking.show()

输出:

+-------------------+--------+-------------------+--------+------------------+
|  First_Biking_time|action_1|   Last_Biking_time|action_2|Durations_in_Hours|
+-------------------+--------+-------------------+--------+------------------+
|2018-03-01 05:12:00|  Biking|2018-03-01 13:12:00|  Biking|               8.0|
|2018-03-02 04:12:00|  Biking|2018-03-02 07:12:00|  Biking|               3.0|
|2018-03-04 05:13:00|  Biking|2018-03-04 11:13:00|  Biking|               6.0|
+-------------------+--------+-------------------+--------+------------------+