如何从 Pyspark 中的日期列中减去天数列?
How to subtract a column of days from a column of dates in Pyspark?
给定以下 PySpark DataFrame
df = sqlContext.createDataFrame([('2015-01-15', 10),
('2015-02-15', 5)],
('date_col', 'days_col'))
如何从日期列中减去天数列?在此示例中,结果列应为 ['2015-01-05', '2015-02-10']
.
我查看了 pyspark.sql.functions.date_sub()
,但它需要一个日期列和一天,即 date_sub(df['date_col'], 10)
。理想情况下,我更愿意做 date_sub(df['date_col'], df['days_col'])
.
我也试过创建一个 UDF:
from datetime import timedelta
def subtract_date(start_date, days_to_subtract):
return start_date - timedelta(days_to_subtract)
subtract_date_udf = udf(subtract_date, DateType())
df.withColumn('subtracted_dates', subtract_date_udf(df['date_col'], df['days_col'])
这在技术上可行,但我读到在 Spark 和 Python 之间步进可能会导致大型数据集出现性能问题。我现在可以坚持使用这个解决方案(无需过早优化),但我的直觉告诉我必须有一种方法可以在不使用 Python UDF 的情况下完成这个简单的事情。
这不是最优雅的解决方案,但如果你不想在 Scala 中破解 SQL 表达式(不是说它应该很难,但这些是 sql
私有的)像这样应该可以解决问题:
from pyspark.sql import Column
def date_sub_(c1: Column, c2: Column) -> Column:
return ((c1.cast("timestamp").cast("long") - 60 * 60 * 24 * c2)
.cast("timestamp").cast("date"))
对于 Python 2.x 只需删除类型注释。
我使用 selectExpr
解决了这个问题。
df.selectExpr('date_sub(date_col, day_col) as subtracted_dates')
如果要将列附加到原始 DF,只需将 *
添加到表达式
df.selectExpr('*', 'date_sub(date_col, day_col) as subtracted_dates')
格式略有不同,但也有效:
df.registerTempTable("dfTbl")
newdf = spark.sql("""
SELECT *, date_sub(d.date_col, d.day_col) AS DateSub
FROM dfTbl d
""")
使用expr
函数(如果你有dynamic values
从要减去的列中):
>>> from pyspark.sql.functions import *
>>> df.withColumn('substracted_dates',expr("date_sub(date_col,days_col)"))
使用withColumn函数(如果你有literal values
进行减法):
>>> df.withColumn('substracted_dates',date_sub('date_col',<int_literal_value>))
给定以下 PySpark DataFrame
df = sqlContext.createDataFrame([('2015-01-15', 10),
('2015-02-15', 5)],
('date_col', 'days_col'))
如何从日期列中减去天数列?在此示例中,结果列应为 ['2015-01-05', '2015-02-10']
.
我查看了 pyspark.sql.functions.date_sub()
,但它需要一个日期列和一天,即 date_sub(df['date_col'], 10)
。理想情况下,我更愿意做 date_sub(df['date_col'], df['days_col'])
.
我也试过创建一个 UDF:
from datetime import timedelta
def subtract_date(start_date, days_to_subtract):
return start_date - timedelta(days_to_subtract)
subtract_date_udf = udf(subtract_date, DateType())
df.withColumn('subtracted_dates', subtract_date_udf(df['date_col'], df['days_col'])
这在技术上可行,但我读到在 Spark 和 Python 之间步进可能会导致大型数据集出现性能问题。我现在可以坚持使用这个解决方案(无需过早优化),但我的直觉告诉我必须有一种方法可以在不使用 Python UDF 的情况下完成这个简单的事情。
这不是最优雅的解决方案,但如果你不想在 Scala 中破解 SQL 表达式(不是说它应该很难,但这些是 sql
私有的)像这样应该可以解决问题:
from pyspark.sql import Column
def date_sub_(c1: Column, c2: Column) -> Column:
return ((c1.cast("timestamp").cast("long") - 60 * 60 * 24 * c2)
.cast("timestamp").cast("date"))
对于 Python 2.x 只需删除类型注释。
我使用 selectExpr
解决了这个问题。
df.selectExpr('date_sub(date_col, day_col) as subtracted_dates')
如果要将列附加到原始 DF,只需将 *
添加到表达式
df.selectExpr('*', 'date_sub(date_col, day_col) as subtracted_dates')
格式略有不同,但也有效:
df.registerTempTable("dfTbl")
newdf = spark.sql("""
SELECT *, date_sub(d.date_col, d.day_col) AS DateSub
FROM dfTbl d
""")
使用expr
函数(如果你有dynamic values
从要减去的列中):
>>> from pyspark.sql.functions import *
>>> df.withColumn('substracted_dates',expr("date_sub(date_col,days_col)"))
使用withColumn函数(如果你有literal values
进行减法):
>>> df.withColumn('substracted_dates',date_sub('date_col',<int_literal_value>))