使用 python 2.6.6 在 Pyspark 2.0.1 中计算增量时间

Calculate deltatime in Pyspark 2.0.1 with python 2.6.6

我的 DF 由两个有时间的列组成,我想计算它们之间的增量时间。

以下DF为原始DF样本:

+-------------------+-------------------+
|               time|              time2|
+-------------------+-------------------+
|2017-01-13 00:17:21|2017-01-13 14:08:03|
|2017-01-13 14:08:08|2017-01-13 14:08:03|
|2017-01-13 14:08:59|2017-01-13 14:08:03|
|2017-01-13 04:21:42|2017-01-13 14:08:03|
+-------------------+-------------------+

Df的架构如下:

root
 |-- time: string (nullable = true)
 |-- time2: string (nullable = true)

我使用了以下方法:

import pyspark.sql.types as typ
import pyspark.sql.functions as fn
from pyspark.sql.functions import udf
import datetime
from time import  mktime, strptime

def diffdates(t1, t2):
    #Date format: %Y-%m-%d %H:%M:%S
    delta= ((mktime(strptime(t1,"%Y-%m-%d %H:%M:%S")) - mktime(strptime(t2, "%Y-%m-%d %H:%M:%S"))))
    return (delta)



dt = udf(diffdates, typ.IntegerType())
Time_Diff = df.withColumn('Diff',(dt(df.time,df.time2)))

生成的新列具有空值,如下所示:

+-------------------+-------------------+----+
|               time|              time2|Diff|
+-------------------+-------------------+----+
|2017-01-13 00:17:21|2017-01-13 14:08:03|null|
|2017-01-13 14:08:08|2017-01-13 14:08:03|null|
|2017-01-13 14:08:59|2017-01-13 14:08:03|null|
|2017-01-13 04:21:42|2017-01-13 14:08:03|null|
+-------------------+-------------------+----+

我该怎么办?

这就是我厌倦的,它对我有用。如果我遗漏了什么,请告诉我,

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()
>>> l = [('2017-01-13 00:17:21','2017-01-13 14:08:03'),('2017-01-13 14:08:08','2017-01-13 14:08:03'),('2017-01-13 14:08:59','2017-01-13 14:08:03'),('2017-01-13 04:21:42','2017-01-13 14:08:03')]
>>> df = spark.createDataFrame(l,['time1','time2'])
>>> df1 = df.select(df.time1.cast('timestamp'),df.time2.cast('timestamp'))
>>> df1.show()
+--------------------+--------------------+
|               time1|               time2|
+--------------------+--------------------+
|2017-01-13 00:17:...|2017-01-13 14:08:...|
|2017-01-13 14:08:...|2017-01-13 14:08:...|
|2017-01-13 14:08:...|2017-01-13 14:08:...|
|2017-01-13 04:21:...|2017-01-13 14:08:...|
+--------------------+--------------------+

>>> from pyspark.sql import functions as F
>>> timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
>>> timeDiff = (F.unix_timestamp('time1', format=timeFmt) - F.unix_timestamp('time2', format=timeFmt))
>>> df1 = df1.withColumn("delta",timeDiff) ## delta is in unit of seconds
>>> df1.show(truncate=False)
+---------------------+---------------------+------+
|time1                |time2                |delta |
+---------------------+---------------------+------+
|2017-01-13 00:17:21.0|2017-01-13 14:08:03.0|-49842|
|2017-01-13 14:08:08.0|2017-01-13 14:08:03.0|5     |
|2017-01-13 14:08:59.0|2017-01-13 14:08:03.0|56    |
|2017-01-13 04:21:42.0|2017-01-13 14:08:03.0|-35181|
+---------------------+---------------------+------+
>>> df1.groupby('time2').agg(F.min('delta')).show()
+--------------------+----------+
|               time2|min(delta)|
+--------------------+----------+
|2017-01-13 14:08:...|    -49842|
+--------------------+----------+