根据天数为列创建地图列表

create list of map for columns based on days

我有师父table为

+-----------+----------+-------------+
|  Age      | Gender   | date        |
+-----------+----------+-------------+
|  [1,2]    |   M      |  2021-01-01 | 
|  [11,13]  |   M      |  2021-01-10 | 
|  [4,5]    |   M      |  2021-01-15 |
|  [3]      |   M      |  2021-01-30 |
|  [7]      |   F      |  2021-02-01 |
|  [2]      |   F      |  2021-02-16 |
|  [6]      |   F      |  2021-02-20 |

所需的输出是 我已经拍摄了 15 天的地图,但年龄日期地图可以更改为 15、30、45 天等

+-----------+----------+-----------------------------------------------------------+
|  Age      | Gender   | date_age_map                                              |
+-----------+----------+-----------------------------------------------------------+
|  [1,2]    |   M      |  [2021-01-01-->[1,2]]                                     | 
|  [11,13]  |   M      |  [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]]]            | 
|  [4,5]    |   M      |  [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]],[2021-01-15-->[4,5]]] |
|  [3]      |   M      |  [2021-01-30-->[3]]                                       |
|  [7]      |   F      |  [2021-02-01-->[7]]                                       |
|  [2]      |   F      |  [[2021-02-01-->[7]],[2021-02-16-->[2]]]                  |
|  [6]      |   F      |  [2021-02-20-->[6]]                                       |

我做了如下努力

spark.sql("""
select Age,Gender,collect_list(date_age_map) over (partition by gender order by date) as date_age_map from 
select Age,Gender,map(date,age) as date_age_map from master""")

有什么方法可以使用 Window 函数使用 spark df、spark sql 或 UDF

在收集 agedate 值来创建地图时,您可以使用 Window 和 rangeBetween 来固定 window 滑动间隔。

像这样:

from pyspark.sql import Window
import pyspark.sql.functions as F

days = 15 

# define a window with range between interval 15 days preceding and current row
# 86400 is the number of second in one day
w = Window.partitionBy("gender").orderBy("date2").rangeBetween(-days * 86400, 0)

df1 = df.withColumn(
    "date2",
    F.col("date").cast("timestamp").cast("long")
).withColumn(
    "date_age_map",
    F.map_from_arrays(
        F.collect_list("date").over(w),
        F.collect_list("age").over(w)
    )
).drop("date2")

df1.show(truncate=False)

#+--------+------+----------+--------------------------------------------------------------------+
#|age     |gender|date      |date_age_map                                                        |
#+--------+------+----------+--------------------------------------------------------------------+
#|[7]     |F     |2021-02-01|[2021-02-01 -> [7]]                                                 |
#|[2]     |F     |2021-02-16|[2021-02-01 -> [7], 2021-02-16 -> [2]]                              |
#|[6]     |F     |2021-02-20|[2021-02-16 -> [2], 2021-02-20 -> [6]]                              |
#|[1, 2]  |M     |2021-01-01|[2021-01-01 -> [1, 2]]                                              |
#|[11, 13]|M     |2021-01-10|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13]]                      |
#|[4, 5]  |M     |2021-01-15|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13], 2021-01-15 -> [4, 5]]|
#|[3]     |M     |2021-01-30|[2021-01-15 -> [4, 5], 2021-01-30 -> [3]]                           |
#+--------+------+----------+--------------------------------------------------------------------+