如何用纯 PySpark 函数替换 Timedelta Pandas 函数?
How to replace the Timedelta Pandas function with a pure PySpark function?
我正在 PySpark 中开发一个小脚本,它生成一个日期序列(今天日期前 36 个月)和(同时应用截断作为该月的第一天)。总的来说,我成功完成了这项任务
但是借助 Pandas 包 Timedelta 来计算时间增量。
有没有办法用纯 PySpark 函数替换 Pandas 中的这个 Timedelta?
import pandas as pd
from datetime import date, timedelta, datetime
from pyspark.sql.functions import col, date_trunc
today = datetime.today()
data = [((date(today.year, today.month, 1) - pd.Timedelta(36,'M')),date(today.year, today.month, 1))] # I want to replace this Pandas function
df = spark.createDataFrame(data, ["minDate", "maxDate"])
+----------+----------+
| minDate| maxDate|
+----------+----------+
|2016-10-01|2019-10-01|
+----------+----------+
import pyspark.sql.functions as f
df = df.withColumn("monthsDiff", f.months_between("maxDate", "minDate"))\
.withColumn("repeat", f.expr("split(repeat(',', monthsDiff), ',')"))\
.select("*", f.posexplode("repeat").alias("date", "val"))\ #
.withColumn("date", f.expr("add_months(minDate, date)"))\
.select('date')\
.show(n=50)
+----------+
| date|
+----------+
|2016-10-01|
|2016-11-01|
|2016-12-01|
|2017-01-01|
|2017-02-01|
|2017-03-01|
etc...
+----------+
您可以使用 Pyspark 内置的 trunc
函数。
pyspark.sql.functions.trunc(日期,格式)
Returns 日期截断为格式指定的单位。
Parameters:
format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’
假设我有一个下面的数据框。
list = [(1,),]
df=spark.createDataFrame(list, ['id'])
import pyspark.sql.functions as f
df=df.withColumn("start_date" ,f.add_months(f.trunc(f.current_date(),"month") ,-36))
df=df.withColumn("max_date" ,f.trunc(f.current_date(),"month"))
>>> df.show()
+---+----------+----------+
| id|start_date| max_date|
+---+----------+----------+
| 1|2016-10-01|2019-10-01|
+---+----------+----------+
这里是 link,其中包含有关 Spark 日期函数的更多详细信息。
我正在 PySpark 中开发一个小脚本,它生成一个日期序列(今天日期前 36 个月)和(同时应用截断作为该月的第一天)。总的来说,我成功完成了这项任务
但是借助 Pandas 包 Timedelta 来计算时间增量。
有没有办法用纯 PySpark 函数替换 Pandas 中的这个 Timedelta?
import pandas as pd
from datetime import date, timedelta, datetime
from pyspark.sql.functions import col, date_trunc
today = datetime.today()
data = [((date(today.year, today.month, 1) - pd.Timedelta(36,'M')),date(today.year, today.month, 1))] # I want to replace this Pandas function
df = spark.createDataFrame(data, ["minDate", "maxDate"])
+----------+----------+
| minDate| maxDate|
+----------+----------+
|2016-10-01|2019-10-01|
+----------+----------+
import pyspark.sql.functions as f
df = df.withColumn("monthsDiff", f.months_between("maxDate", "minDate"))\
.withColumn("repeat", f.expr("split(repeat(',', monthsDiff), ',')"))\
.select("*", f.posexplode("repeat").alias("date", "val"))\ #
.withColumn("date", f.expr("add_months(minDate, date)"))\
.select('date')\
.show(n=50)
+----------+
| date|
+----------+
|2016-10-01|
|2016-11-01|
|2016-12-01|
|2017-01-01|
|2017-02-01|
|2017-03-01|
etc...
+----------+
您可以使用 Pyspark 内置的 trunc
函数。
pyspark.sql.functions.trunc(日期,格式) Returns 日期截断为格式指定的单位。
Parameters:
format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’
假设我有一个下面的数据框。
list = [(1,),]
df=spark.createDataFrame(list, ['id'])
import pyspark.sql.functions as f
df=df.withColumn("start_date" ,f.add_months(f.trunc(f.current_date(),"month") ,-36))
df=df.withColumn("max_date" ,f.trunc(f.current_date(),"month"))
>>> df.show()
+---+----------+----------+
| id|start_date| max_date|
+---+----------+----------+
| 1|2016-10-01|2019-10-01|
+---+----------+----------+
这里是 link,其中包含有关 Spark 日期函数的更多详细信息。