如何在 spark 中创建日期范围映射?
how to create a date range mapping in spark?
我有一个具有以下结构的 spark 数据框。
+-------+-------------------+
|country| date_published|
+-------+-------------------+
| UK|2020-04-15 00:00:00|
| UK|2020-04-14 00:00:00|
| UK|2020-04-09 00:00:00|
| UK|2020-04-08 00:00:00|
| UK|2020-04-07 00:00:00|
| UK|2020-04-06 00:00:00|
| UK|2020-04-03 00:00:00|
| UK|2020-04-02 00:00:00|
| UK|2020-04-01 00:00:00|
| UK|2020-03-31 00:00:00|
| UK|2020-03-30 00:00:00|
| UK|2020-03-27 00:00:00|
| UK|2020-03-26 00:00:00|
| UK|2020-03-25 00:00:00|
| UK|2020-03-24 00:00:00|
| UK|2020-03-23 00:00:00|
| UK|2020-03-20 00:00:00|
| UK|2020-03-19 00:00:00|
| UK|2020-03-18 00:00:00|
| UK|2020-03-17 00:00:00|
+-------+-------------------+
我想根据此数据创建日期映射。条件,
2020-01-01 之前的所有日期都应映射为 "YTD"。
2019 年 4 月 15 日之前的所有日期应映射为 "LAST_1_YEAR"。
从 2019-01-01 到 2019-04-15(去年日期截至当天)的所有日期应映射为 "YTD_LAST_YEAR"
2019-04-15 之前的所有日期应映射为 "YEAR_AGO_1_YEAR"
我们可以创建两列,例如 ytd_map(条件 1, 3), last_year_map(条件 2,4)
列表中可能还有其他国家,以上条件应该适用于他们
我尝试过的方法是为每个国家/地区创建一个 max_date_published 的数据框,但我不确定如何分别为每个国家/地区过滤数据框。
df_data = df_data_cleaned.select("date_published","country").distinct().orderBy(F.desc("date_published"))
df_max_dt = df_data.groupBy("country").agg(F.max(F.col("date_published")))
df_max_dt.collect()
我试过了,现在可以用了。
spark.sql("select country,\
date_published,\
(case when date_published >= max_date_published_last_year then 'LAST_1_YEAR'\
when date_published <= max_date_published_last_year and date_published >= add_months(max_date_published_last_year, -12) then 'YEAR_AGO_1_YEAR' else '' end) as MAT_MAPPING,\
(case when date_published >= date_published_start_of_year then 'YTD'\
when date_published <= max_date_published_last_year and date_published >= date_published_start_of_last_year\
then 'YTD_LAST_YEAR'\
else '' end) as YTD_MAPPING from\
(select t.country, t.date_published, t.date_published_ya, t.max_date_published_current_year,\
cast(add_months(t.max_date_published_current_year, -12) as timestamp) as max_date_published_last_year,\
date_trunc('year', max_date_published_current_year) AS date_published_start_of_year,\
date_trunc('year', cast(add_months(t.max_date_published_current_year, -12) as timestamp)) AS date_published_start_of_last_year\
from\
(select country,\
date_published,cast(add_months(date_published, -12) as timestamp) as date_published_ya,\
max(date_published)over(partition by country order by date_published desc) max_date_published_current_year from df_mintel_time) t) t2")
我有一个具有以下结构的 spark 数据框。
+-------+-------------------+
|country| date_published|
+-------+-------------------+
| UK|2020-04-15 00:00:00|
| UK|2020-04-14 00:00:00|
| UK|2020-04-09 00:00:00|
| UK|2020-04-08 00:00:00|
| UK|2020-04-07 00:00:00|
| UK|2020-04-06 00:00:00|
| UK|2020-04-03 00:00:00|
| UK|2020-04-02 00:00:00|
| UK|2020-04-01 00:00:00|
| UK|2020-03-31 00:00:00|
| UK|2020-03-30 00:00:00|
| UK|2020-03-27 00:00:00|
| UK|2020-03-26 00:00:00|
| UK|2020-03-25 00:00:00|
| UK|2020-03-24 00:00:00|
| UK|2020-03-23 00:00:00|
| UK|2020-03-20 00:00:00|
| UK|2020-03-19 00:00:00|
| UK|2020-03-18 00:00:00|
| UK|2020-03-17 00:00:00|
+-------+-------------------+
我想根据此数据创建日期映射。条件,
2020-01-01 之前的所有日期都应映射为 "YTD"。
2019 年 4 月 15 日之前的所有日期应映射为 "LAST_1_YEAR"。
从 2019-01-01 到 2019-04-15(去年日期截至当天)的所有日期应映射为 "YTD_LAST_YEAR"
2019-04-15 之前的所有日期应映射为 "YEAR_AGO_1_YEAR"
我们可以创建两列,例如 ytd_map(条件 1, 3), last_year_map(条件 2,4)
列表中可能还有其他国家,以上条件应该适用于他们
我尝试过的方法是为每个国家/地区创建一个 max_date_published 的数据框,但我不确定如何分别为每个国家/地区过滤数据框。
df_data = df_data_cleaned.select("date_published","country").distinct().orderBy(F.desc("date_published"))
df_max_dt = df_data.groupBy("country").agg(F.max(F.col("date_published")))
df_max_dt.collect()
我试过了,现在可以用了。
spark.sql("select country,\
date_published,\
(case when date_published >= max_date_published_last_year then 'LAST_1_YEAR'\
when date_published <= max_date_published_last_year and date_published >= add_months(max_date_published_last_year, -12) then 'YEAR_AGO_1_YEAR' else '' end) as MAT_MAPPING,\
(case when date_published >= date_published_start_of_year then 'YTD'\
when date_published <= max_date_published_last_year and date_published >= date_published_start_of_last_year\
then 'YTD_LAST_YEAR'\
else '' end) as YTD_MAPPING from\
(select t.country, t.date_published, t.date_published_ya, t.max_date_published_current_year,\
cast(add_months(t.max_date_published_current_year, -12) as timestamp) as max_date_published_last_year,\
date_trunc('year', max_date_published_current_year) AS date_published_start_of_year,\
date_trunc('year', cast(add_months(t.max_date_published_current_year, -12) as timestamp)) AS date_published_start_of_last_year\
from\
(select country,\
date_published,cast(add_months(date_published, -12) as timestamp) as date_published_ya,\
max(date_published)over(partition by country order by date_published desc) max_date_published_current_year from df_mintel_time) t) t2")