如何在spark中过滤变量日期?
How to filter variable date In spark?
我有一个包含 2010 年到 2040 年之间所有日期的数据集
1/1/2010
1/2/2010
1/3/2010
...
...
...
12/31/2040
我正在使用 Spark 转换数据,我正在尝试应用一个过滤器,该过滤器只保留 [今天 - 2 年,未来开放]
的日期
我确实尝试过使用 Spark 提供的所有日期操作函数,包括
df_calendar.filter(datediff(to_date(col("date"),"m/d/yyyy"),current_date()).gt(-730))
df_calendar.select("*").withColumn("datediff",datediff(to_date(col("date"),"m/d/yyyy"),current_date())).filter(col("datediff")>(-730))
val today = df_calendar.select(date_sub(current_date(),730))
df_calendar.filter((to_date(col("date"),"m/d/yyyy") > today ))
但我总是得到相同的结果,数据集 return 所有值都从 2021 年 1 月 1 日开始,因为它可以追溯到“2 年”,但不是以天数计算。
请注意,我也尝试使用 year() 函数,它也 returns 相同的结果,我对每次 returns 的结果感到非常困惑,我真的需要你的帮助.
不太清楚为什么您的代码不起作用。
>>> from datetime import date, timedelta
>>> import pyspark.sql.functions as F
>>>
>>> seed_date = date(2010, 1, 1)
>>> two_years_back = date.today() - timedelta(days=2*365)
>>>
>>> df1 = spark.createDataFrame(data=[[ [seed_date + timedelta(days=i) for i in range(0, 11323)] ]])
>>> df_calendar = df1.withColumn('date', F.explode(df1['_1'])).drop('_1')
>>>
>>> calendar_list = df_calendar.collect()
>>> calendar_list[0]
Row(date=datetime.date(2010, 1, 1))
>>> calendar_list[-1]
Row(date=datetime.date(2040, 12, 31))
>>>
>>> df3 = df_calendar.filter((F.col('date') > two_years_back) \
... & (F.col('date') < date.today())
>>> filtered_list = filtered_df.collect()
>>>
>>> filtered_list[0]
Row(date=datetime.date(2020, 2, 26))
>>> filtered_list[-1]
Row(date=datetime.date(2022, 2, 23))
>>>
编辑:所以我猜你的代码的问题可能是你使用了错误的模式。
>>>
>>> from datetime import date, timedelta
>>> import pyspark.sql.functions as F
>>>
>>> seed_date = date(2010, 1, 1)
>>> two_years_back = date.today() - timedelta(days=2*365)
>>>
>>> df1 = spark.createDataFrame(data=[[ [(seed_date + timedelta(days=i)).strftime('%-m/%-d/%Y') for i in range(0, 11323)] ]])
>>> df_calendar = df1.withColumn('date', F.explode(df1['_1'])).drop('_1')
>>>
>>> calendar_list = df_calendar.collect()
>>> calendar_list[0]
Row(date='1/1/2010')
>>> calendar_list[-1]
Row(date='12/31/2040')
>>>
>>> filtered_df = df_calendar.filter((F.to_date(F.col("date"), 'M/d/yyyy') < date.today()) \
... & (F.to_date(F.col("date"), 'M/d/yyyy') > two_years_back))
>>> filtered_list = filtered_df.collect()
>>>
>>> filtered_list[0]
Row(date='2/26/2020')
>>> filtered_list[-1]
Row(date='2/23/2022')
>>>
我有一个包含 2010 年到 2040 年之间所有日期的数据集
1/1/2010
1/2/2010
1/3/2010
...
...
...
12/31/2040
我正在使用 Spark 转换数据,我正在尝试应用一个过滤器,该过滤器只保留 [今天 - 2 年,未来开放]
的日期我确实尝试过使用 Spark 提供的所有日期操作函数,包括
df_calendar.filter(datediff(to_date(col("date"),"m/d/yyyy"),current_date()).gt(-730))
df_calendar.select("*").withColumn("datediff",datediff(to_date(col("date"),"m/d/yyyy"),current_date())).filter(col("datediff")>(-730))
val today = df_calendar.select(date_sub(current_date(),730))
df_calendar.filter((to_date(col("date"),"m/d/yyyy") > today ))
但我总是得到相同的结果,数据集 return 所有值都从 2021 年 1 月 1 日开始,因为它可以追溯到“2 年”,但不是以天数计算。 请注意,我也尝试使用 year() 函数,它也 returns 相同的结果,我对每次 returns 的结果感到非常困惑,我真的需要你的帮助.
不太清楚为什么您的代码不起作用。
>>> from datetime import date, timedelta
>>> import pyspark.sql.functions as F
>>>
>>> seed_date = date(2010, 1, 1)
>>> two_years_back = date.today() - timedelta(days=2*365)
>>>
>>> df1 = spark.createDataFrame(data=[[ [seed_date + timedelta(days=i) for i in range(0, 11323)] ]])
>>> df_calendar = df1.withColumn('date', F.explode(df1['_1'])).drop('_1')
>>>
>>> calendar_list = df_calendar.collect()
>>> calendar_list[0]
Row(date=datetime.date(2010, 1, 1))
>>> calendar_list[-1]
Row(date=datetime.date(2040, 12, 31))
>>>
>>> df3 = df_calendar.filter((F.col('date') > two_years_back) \
... & (F.col('date') < date.today())
>>> filtered_list = filtered_df.collect()
>>>
>>> filtered_list[0]
Row(date=datetime.date(2020, 2, 26))
>>> filtered_list[-1]
Row(date=datetime.date(2022, 2, 23))
>>>
编辑:所以我猜你的代码的问题可能是你使用了错误的模式。
>>>
>>> from datetime import date, timedelta
>>> import pyspark.sql.functions as F
>>>
>>> seed_date = date(2010, 1, 1)
>>> two_years_back = date.today() - timedelta(days=2*365)
>>>
>>> df1 = spark.createDataFrame(data=[[ [(seed_date + timedelta(days=i)).strftime('%-m/%-d/%Y') for i in range(0, 11323)] ]])
>>> df_calendar = df1.withColumn('date', F.explode(df1['_1'])).drop('_1')
>>>
>>> calendar_list = df_calendar.collect()
>>> calendar_list[0]
Row(date='1/1/2010')
>>> calendar_list[-1]
Row(date='12/31/2040')
>>>
>>> filtered_df = df_calendar.filter((F.to_date(F.col("date"), 'M/d/yyyy') < date.today()) \
... & (F.to_date(F.col("date"), 'M/d/yyyy') > two_years_back))
>>> filtered_list = filtered_df.collect()
>>>
>>> filtered_list[0]
Row(date='2/26/2020')
>>> filtered_list[-1]
Row(date='2/23/2022')
>>>