通过减去字符串格式的两个日期时间列来计算持续时间
Calculating duration by subtracting two datetime columns in string format
我有一个由一系列日期组成的 Spark Dataframe:
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd
rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
我想做的是通过减去 EndDateTime
和 StartDateTime
来找到 duration
。我想我会尝试使用一个函数来做到这一点:
# Function to calculate time delta
def time_delta(y,x):
end = pd.to_datetime(y)
start = pd.to_datetime(x)
delta = (end-start)
return delta
# create new RDD and add new column 'Duration' by applying time_delta function
df2 = df.withColumn('Duration', time_delta(df.EndDateTime, df.StartDateTime))
然而这只是给我:
>>> df2.show()
ID EndDateTime StartDateTime ANI Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... sip:4534454450 null
X02 2014-02-13T12:35:... 2014-02-13T12:32:... sip:6413445440 null
X03 2014-02-13T12:36:... 2014-02-13T12:32:... sip:4534437492 null
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... sip:6474454453 null
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... sip:8874458555 null
我不确定我的方法是否正确。如果没有,我很乐意接受另一种建议的方法来实现这一目标。
感谢大卫·格里芬。以下是如何执行此操作以供将来参考。
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf
# Build sample data
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
# define timedelta function (obtain duration in seconds)
def time_delta(y,x):
from datetime import datetime
end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')
start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')
delta = (end-start).total_seconds()
return delta
# register as a UDF
f = udf(time_delta, IntegerType())
# Apply function
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime))
应用 time_delta()
将为您提供以秒为单位的持续时间:
>>> df2.show()
ID EndDateTime StartDateTime Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... 258
X02 2014-02-13T12:35:... 2014-02-13T12:32:... 204
X03 2014-02-13T12:36:... 2014-02-13T12:32:... 228
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... 268
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... 202
从 Spark 1.5 开始,您可以使用 unix_timestamp:
from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('EndDateTime', format=timeFmt)
- F.unix_timestamp('StartDateTime', format=timeFmt))
df = df.withColumn("Duration", timeDiff)
注意 Java 风格的时间格式。
>>> df.show()
+---+--------------------+--------------------+--------+
| ID| EndDateTime| StartDateTime|Duration|
+---+--------------------+--------------------+--------+
|X01|2014-02-13T12:36:...|2014-02-13T12:31:...| 258|
|X02|2014-02-13T12:35:...|2014-02-13T12:32:...| 204|
|X03|2014-02-13T12:36:...|2014-02-13T12:32:...| 228|
|XO4|2014-02-13T12:37:...|2014-02-13T12:32:...| 269|
|XO5|2014-02-13T12:36:...|2014-02-13T12:33:...| 202|
+---+--------------------+--------------------+--------+
datediff(Column end, Column start)
Returns从开始到结束的天数。
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html
这是 spark 2.x 的工作版本,源自 jason 的
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.types import StringType, StructType, StructField
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
# register as a UDF
from datetime import datetime
sqlContext.registerFunction("time_delta", lambda y,x:(datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')-datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')).total_seconds())
df.createOrReplaceTempView("Test_table")
spark.sql("SELECT ID,EndDateTime,StartDateTime,time_delta(EndDateTime,StartDateTime) as time_delta FROM Test_table").show()
sc.stop()
这可以在 spark-sql 中完成,方法是将字符串日期转换为时间戳,然后获取差异。
1:转换为时间戳:
CAST(UNIX_TIMESTAMP(MY_COL_NAME,'dd-MMM-yy') as TIMESTAMP)
2:使用datediff
函数获取日期之间的差异。
这将组合成一个嵌套函数,如:
spark.sql("select COL_1, COL_2, datediff( CAST( UNIX_TIMESTAMP( COL_1,'dd-MMM-yy') as TIMESTAMP), CAST( UNIX_TIMESTAMP( COL_2,'dd-MMM-yy') as TIMESTAMP) ) as LAG_in_days from MyTable")
结果如下:
+---------+---------+-----------+
| COL_1| COL_2|LAG_in_days|
+---------+---------+-----------+
|24-JAN-17|16-JAN-17| 8|
|19-JAN-05|18-JAN-05| 1|
|23-MAY-06|23-MAY-06| 0|
|18-AUG-06|17-AUG-06| 1|
+---------+---------+-----------+
使用 DoubleType 而不是 IntegerType
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf
# Build sample data
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
# define timedelta function (obtain duration in seconds)
def time_delta(y,x):
from datetime import datetime
end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')
start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')
delta = (end-start).total_seconds()
return delta
# register as a UDF
f = udf(time_delta, DoubleType())
# Apply function
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime))
我有一个由一系列日期组成的 Spark Dataframe:
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd
rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
我想做的是通过减去 EndDateTime
和 StartDateTime
来找到 duration
。我想我会尝试使用一个函数来做到这一点:
# Function to calculate time delta
def time_delta(y,x):
end = pd.to_datetime(y)
start = pd.to_datetime(x)
delta = (end-start)
return delta
# create new RDD and add new column 'Duration' by applying time_delta function
df2 = df.withColumn('Duration', time_delta(df.EndDateTime, df.StartDateTime))
然而这只是给我:
>>> df2.show()
ID EndDateTime StartDateTime ANI Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... sip:4534454450 null
X02 2014-02-13T12:35:... 2014-02-13T12:32:... sip:6413445440 null
X03 2014-02-13T12:36:... 2014-02-13T12:32:... sip:4534437492 null
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... sip:6474454453 null
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... sip:8874458555 null
我不确定我的方法是否正确。如果没有,我很乐意接受另一种建议的方法来实现这一目标。
感谢大卫·格里芬。以下是如何执行此操作以供将来参考。
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf
# Build sample data
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
# define timedelta function (obtain duration in seconds)
def time_delta(y,x):
from datetime import datetime
end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')
start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')
delta = (end-start).total_seconds()
return delta
# register as a UDF
f = udf(time_delta, IntegerType())
# Apply function
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime))
应用 time_delta()
将为您提供以秒为单位的持续时间:
>>> df2.show()
ID EndDateTime StartDateTime Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... 258
X02 2014-02-13T12:35:... 2014-02-13T12:32:... 204
X03 2014-02-13T12:36:... 2014-02-13T12:32:... 228
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... 268
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... 202
从 Spark 1.5 开始,您可以使用 unix_timestamp:
from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('EndDateTime', format=timeFmt)
- F.unix_timestamp('StartDateTime', format=timeFmt))
df = df.withColumn("Duration", timeDiff)
注意 Java 风格的时间格式。
>>> df.show()
+---+--------------------+--------------------+--------+
| ID| EndDateTime| StartDateTime|Duration|
+---+--------------------+--------------------+--------+
|X01|2014-02-13T12:36:...|2014-02-13T12:31:...| 258|
|X02|2014-02-13T12:35:...|2014-02-13T12:32:...| 204|
|X03|2014-02-13T12:36:...|2014-02-13T12:32:...| 228|
|XO4|2014-02-13T12:37:...|2014-02-13T12:32:...| 269|
|XO5|2014-02-13T12:36:...|2014-02-13T12:33:...| 202|
+---+--------------------+--------------------+--------+
datediff(Column end, Column start)
Returns从开始到结束的天数。
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html
这是 spark 2.x 的工作版本,源自 jason 的
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.types import StringType, StructType, StructField
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
# register as a UDF
from datetime import datetime
sqlContext.registerFunction("time_delta", lambda y,x:(datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')-datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')).total_seconds())
df.createOrReplaceTempView("Test_table")
spark.sql("SELECT ID,EndDateTime,StartDateTime,time_delta(EndDateTime,StartDateTime) as time_delta FROM Test_table").show()
sc.stop()
这可以在 spark-sql 中完成,方法是将字符串日期转换为时间戳,然后获取差异。
1:转换为时间戳:
CAST(UNIX_TIMESTAMP(MY_COL_NAME,'dd-MMM-yy') as TIMESTAMP)
2:使用datediff
函数获取日期之间的差异。
这将组合成一个嵌套函数,如:
spark.sql("select COL_1, COL_2, datediff( CAST( UNIX_TIMESTAMP( COL_1,'dd-MMM-yy') as TIMESTAMP), CAST( UNIX_TIMESTAMP( COL_2,'dd-MMM-yy') as TIMESTAMP) ) as LAG_in_days from MyTable")
结果如下:
+---------+---------+-----------+
| COL_1| COL_2|LAG_in_days|
+---------+---------+-----------+
|24-JAN-17|16-JAN-17| 8|
|19-JAN-05|18-JAN-05| 1|
|23-MAY-06|23-MAY-06| 0|
|18-AUG-06|17-AUG-06| 1|
+---------+---------+-----------+
使用 DoubleType 而不是 IntegerType
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf
# Build sample data
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
# define timedelta function (obtain duration in seconds)
def time_delta(y,x):
from datetime import datetime
end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')
start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')
delta = (end-start).total_seconds()
return delta
# register as a UDF
f = udf(time_delta, DoubleType())
# Apply function
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime))