根据天数为列创建地图列表
create list of map for columns based on days
我有师父table为
+-----------+----------+-------------+
| Age | Gender | date |
+-----------+----------+-------------+
| [1,2] | M | 2021-01-01 |
| [11,13] | M | 2021-01-10 |
| [4,5] | M | 2021-01-15 |
| [3] | M | 2021-01-30 |
| [7] | F | 2021-02-01 |
| [2] | F | 2021-02-16 |
| [6] | F | 2021-02-20 |
所需的输出是
我已经拍摄了 15 天的地图,但年龄日期地图可以更改为 15、30、45 天等
+-----------+----------+-----------------------------------------------------------+
| Age | Gender | date_age_map |
+-----------+----------+-----------------------------------------------------------+
| [1,2] | M | [2021-01-01-->[1,2]] |
| [11,13] | M | [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]]] |
| [4,5] | M | [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]],[2021-01-15-->[4,5]]] |
| [3] | M | [2021-01-30-->[3]] |
| [7] | F | [2021-02-01-->[7]] |
| [2] | F | [[2021-02-01-->[7]],[2021-02-16-->[2]]] |
| [6] | F | [2021-02-20-->[6]] |
我做了如下努力
spark.sql("""
select Age,Gender,collect_list(date_age_map) over (partition by gender order by date) as date_age_map from
select Age,Gender,map(date,age) as date_age_map from master""")
有什么方法可以使用 Window 函数使用 spark df、spark sql 或 UDF
在收集 age
和 date
值来创建地图时,您可以使用 Window 和 rangeBetween
来固定 window 滑动间隔。
像这样:
from pyspark.sql import Window
import pyspark.sql.functions as F
days = 15
# define a window with range between interval 15 days preceding and current row
# 86400 is the number of second in one day
w = Window.partitionBy("gender").orderBy("date2").rangeBetween(-days * 86400, 0)
df1 = df.withColumn(
"date2",
F.col("date").cast("timestamp").cast("long")
).withColumn(
"date_age_map",
F.map_from_arrays(
F.collect_list("date").over(w),
F.collect_list("age").over(w)
)
).drop("date2")
df1.show(truncate=False)
#+--------+------+----------+--------------------------------------------------------------------+
#|age |gender|date |date_age_map |
#+--------+------+----------+--------------------------------------------------------------------+
#|[7] |F |2021-02-01|[2021-02-01 -> [7]] |
#|[2] |F |2021-02-16|[2021-02-01 -> [7], 2021-02-16 -> [2]] |
#|[6] |F |2021-02-20|[2021-02-16 -> [2], 2021-02-20 -> [6]] |
#|[1, 2] |M |2021-01-01|[2021-01-01 -> [1, 2]] |
#|[11, 13]|M |2021-01-10|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13]] |
#|[4, 5] |M |2021-01-15|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13], 2021-01-15 -> [4, 5]]|
#|[3] |M |2021-01-30|[2021-01-15 -> [4, 5], 2021-01-30 -> [3]] |
#+--------+------+----------+--------------------------------------------------------------------+
我有师父table为
+-----------+----------+-------------+
| Age | Gender | date |
+-----------+----------+-------------+
| [1,2] | M | 2021-01-01 |
| [11,13] | M | 2021-01-10 |
| [4,5] | M | 2021-01-15 |
| [3] | M | 2021-01-30 |
| [7] | F | 2021-02-01 |
| [2] | F | 2021-02-16 |
| [6] | F | 2021-02-20 |
所需的输出是 我已经拍摄了 15 天的地图,但年龄日期地图可以更改为 15、30、45 天等
+-----------+----------+-----------------------------------------------------------+
| Age | Gender | date_age_map |
+-----------+----------+-----------------------------------------------------------+
| [1,2] | M | [2021-01-01-->[1,2]] |
| [11,13] | M | [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]]] |
| [4,5] | M | [[2021-01-01-->[1,2]],[2021-01-10-->[11,13]],[2021-01-15-->[4,5]]] |
| [3] | M | [2021-01-30-->[3]] |
| [7] | F | [2021-02-01-->[7]] |
| [2] | F | [[2021-02-01-->[7]],[2021-02-16-->[2]]] |
| [6] | F | [2021-02-20-->[6]] |
我做了如下努力
spark.sql("""
select Age,Gender,collect_list(date_age_map) over (partition by gender order by date) as date_age_map from
select Age,Gender,map(date,age) as date_age_map from master""")
有什么方法可以使用 Window 函数使用 spark df、spark sql 或 UDF
在收集 age
和 date
值来创建地图时,您可以使用 Window 和 rangeBetween
来固定 window 滑动间隔。
像这样:
from pyspark.sql import Window
import pyspark.sql.functions as F
days = 15
# define a window with range between interval 15 days preceding and current row
# 86400 is the number of second in one day
w = Window.partitionBy("gender").orderBy("date2").rangeBetween(-days * 86400, 0)
df1 = df.withColumn(
"date2",
F.col("date").cast("timestamp").cast("long")
).withColumn(
"date_age_map",
F.map_from_arrays(
F.collect_list("date").over(w),
F.collect_list("age").over(w)
)
).drop("date2")
df1.show(truncate=False)
#+--------+------+----------+--------------------------------------------------------------------+
#|age |gender|date |date_age_map |
#+--------+------+----------+--------------------------------------------------------------------+
#|[7] |F |2021-02-01|[2021-02-01 -> [7]] |
#|[2] |F |2021-02-16|[2021-02-01 -> [7], 2021-02-16 -> [2]] |
#|[6] |F |2021-02-20|[2021-02-16 -> [2], 2021-02-20 -> [6]] |
#|[1, 2] |M |2021-01-01|[2021-01-01 -> [1, 2]] |
#|[11, 13]|M |2021-01-10|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13]] |
#|[4, 5] |M |2021-01-15|[2021-01-01 -> [1, 2], 2021-01-10 -> [11, 13], 2021-01-15 -> [4, 5]]|
#|[3] |M |2021-01-30|[2021-01-15 -> [4, 5], 2021-01-30 -> [3]] |
#+--------+------+----------+--------------------------------------------------------------------+