如何在 spark 中创建日期范围映射?

how to create a date range mapping in spark?

我有一个具有以下结构的 spark 数据框。

+-------+-------------------+
|country|     date_published|
+-------+-------------------+
|     UK|2020-04-15 00:00:00|
|     UK|2020-04-14 00:00:00|
|     UK|2020-04-09 00:00:00|
|     UK|2020-04-08 00:00:00|
|     UK|2020-04-07 00:00:00|
|     UK|2020-04-06 00:00:00|
|     UK|2020-04-03 00:00:00|
|     UK|2020-04-02 00:00:00|
|     UK|2020-04-01 00:00:00|
|     UK|2020-03-31 00:00:00|
|     UK|2020-03-30 00:00:00|
|     UK|2020-03-27 00:00:00|
|     UK|2020-03-26 00:00:00|
|     UK|2020-03-25 00:00:00|
|     UK|2020-03-24 00:00:00|
|     UK|2020-03-23 00:00:00|
|     UK|2020-03-20 00:00:00|
|     UK|2020-03-19 00:00:00|
|     UK|2020-03-18 00:00:00|
|     UK|2020-03-17 00:00:00|
+-------+-------------------+

我想根据此数据创建日期映射。条件,

  1. 2020-01-01 之前的所有日期都应映射为 "YTD"。

  2. 2019 年 4 月 15 日之前的所有日期应映射为 "LAST_1_YEAR"。

  3. 从 2019-01-01 到 2019-04-15(去年日期截至当天)的所有日期应映射为 "YTD_LAST_YEAR"

  4. 2019-04-15 之前的所有日期应映射为 "YEAR_AGO_1_YEAR"

  5. 我们可以创建两列,例如 ytd_map(条件 1, 3), last_year_map(条件 2,4)

列表中可能还有其他国家,以上条件应该适用于他们

我尝试过的方法是为每个国家/地区创建一个 max_date_published 的数据框,但我不确定如何分别为每个国家/地区过滤数据框。

df_data = df_data_cleaned.select("date_published","country").distinct().orderBy(F.desc("date_published"))
df_max_dt = df_data.groupBy("country").agg(F.max(F.col("date_published")))
df_max_dt.collect()

我试过了,现在可以用了。

spark.sql("select country,\
    date_published,\
    (case when date_published >= max_date_published_last_year then 'LAST_1_YEAR'\
     when date_published <= max_date_published_last_year and date_published >= add_months(max_date_published_last_year, -12) then  'YEAR_AGO_1_YEAR' else '' end) as MAT_MAPPING,\
     (case when date_published >= date_published_start_of_year then 'YTD'\
     when date_published <= max_date_published_last_year and date_published >= date_published_start_of_last_year\
     then 'YTD_LAST_YEAR'\
              else '' end) as YTD_MAPPING from\
    (select t.country, t.date_published, t.date_published_ya, t.max_date_published_current_year,\
    cast(add_months(t.max_date_published_current_year, -12) as timestamp) as max_date_published_last_year,\
          date_trunc('year', max_date_published_current_year) AS date_published_start_of_year,\
          date_trunc('year', cast(add_months(t.max_date_published_current_year, -12) as timestamp)) AS date_published_start_of_last_year\
          from\
    (select country,\
    date_published,cast(add_months(date_published, -12) as timestamp) as date_published_ya,\
    max(date_published)over(partition by country order by date_published desc) max_date_published_current_year from df_mintel_time) t) t2")