在 PySpark 数据框中选定时间间隔内的日期间隔之间复制记录
duplicating records between date gaps within a selected time interval in a PySpark dataframe
我有一个 PySpark 数据框,可以跟踪几个月来产品价格和状态发生的变化。这意味着只有当与上个月相比发生变化(状态或价格)时才会创建新行,例如下面的虚拟数据
----------------------------------------
|product_id| status | price| month |
----------------------------------------
|1 | available | 5 | 2019-10|
----------------------------------------
|1 | available | 8 | 2020-08|
----------------------------------------
|1 | limited | 8 | 2020-10|
----------------------------------------
|2 | limited | 1 | 2020-09|
----------------------------------------
|2 | limited | 3 | 2020-10|
----------------------------------------
我想创建一个数据框来显示过去 6 个月中每个月的值。这意味着只要上述数据框中存在间隙,我就需要复制记录。例如,如果最近 6 个月是 2020-07、2020-08、... 2020-12,则上述数据帧的结果应为
----------------------------------------
|product_id| status | price| month |
----------------------------------------
|1 | available | 5 | 2020-07|
----------------------------------------
|1 | available | 8 | 2020-08|
----------------------------------------
|1 | available | 8 | 2020-09|
----------------------------------------
|1 | limited | 8 | 2020-10|
----------------------------------------
|1 | limited | 8 | 2020-11|
----------------------------------------
|1 | limited | 8 | 2020-12|
----------------------------------------
|2 | limited | 1 | 2020-09|
----------------------------------------
|2 | limited | 3 | 2020-10|
----------------------------------------
|2 | limited | 3 | 2020-11|
----------------------------------------
|2 | limited | 3 | 2020-12|
----------------------------------------
请注意,对于 product_id = 1,有一条 2019-10 年的较旧记录传播到 2020-08 年,然后被修剪,而对于 product_id = 2,之前没有记录2020-09,因此 2020-07、2020-08 月份没有填充(因为该产品在 2020-09 之前不存在)。
由于数据框由数百万条记录组成,因此使用 for 循环并检查每个 product_id 的“强力”解决方案相当慢。似乎应该可以使用 window 函数解决这个问题,方法是创建另一列 next_month,然后根据该列填补空白,但我不知道如何实现。
关于 @jxc 的评论,我已经为这个用例准备了答案。
以下是代码片段。
导入 spark SQL 函数
from pyspark.sql import functions as F, Window
准备样本数据
simpleData = ((1,"Available",5,"2020-07"),
(1,"Available",8,"2020-08"),
(1,"Limited",8,"2020-12"),
(2,"Limited",1,"2020-09"),
(2,"Limited",3,"2020-12")
)
columns= ["product_id", "status", "price", "month"]
正在创建示例数据的数据框
df = spark.createDataFrame(data = simpleData, schema = columns)
在数据框中添加日期列以获得正确格式的日期
df0 = df.withColumn("date",F.to_date('month','yyyy-MM'))
df0.show()
+----------+---------+-----+-------+----------+
|product_id| status|price| month| date|
+----------+---------+-----+-------+----------+
| 1|Available| 5|2020-07|2020-07-01|
| 1|Available| 8|2020-08|2020-08-01|
| 1| Limited| 8|2020-12|2020-12-01|
| 2| Limited| 1|2020-09|2020-09-01|
| 2| Limited| 3|2020-12|2020-12-01|
+----------+---------+-----+-------+----------+
- 创建 WinSpec w1 并使用 Window 聚合函数导致找到下一个日期 (w1),将其转换为前几个月以设置日期序列:
w1 = Window.partitionBy('product_id').orderBy('date')
df1 = df0.withColumn('end_date',F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
df1.show()
+----------+---------+-----+-------+----------+----------+
|product_id| status|price| month| date| end_date|
+----------+---------+-----+-------+----------+----------+
| 1|Available| 5|2020-07|2020-07-01|2020-07-01|
| 1|Available| 8|2020-08|2020-08-01|2020-11-01|
| 1| Limited| 8|2020-12|2020-12-01|2020-12-01|
| 2| Limited| 1|2020-09|2020-09-01|2020-11-01|
| 2| Limited| 3|2020-12|2020-12-01|2020-12-01|
+----------+---------+-----+-------+----------+----------+
- 使用months_between(end_date, date)计算两个日期之间的月数,并使用transform函数迭代sequence(0, #months),创建一个named_struct 使用 date=add_months(date,i) 和 price=IF(i=0,price,price),使用 inline_outer 分解结构数组。
df2 = df1.selectExpr("product_id", "status", inline_outer( transform( sequence(0,int(months_between(end_date, date)),1), i -> (add_months(date,i) as date, IF(i=0,price,price) as price) ) ) )
df2.show()
+----------+---------+----------+-----+
|product_id| status| date|price|
+----------+---------+----------+-----+
| 1|Available|2020-07-01| 5|
| 1|Available|2020-08-01| 8|
| 1|Available|2020-09-01| 8|
| 1|Available|2020-10-01| 8|
| 1|Available|2020-11-01| 8|
| 1| Limited|2020-12-01| 8|
| 2| Limited|2020-09-01| 1|
| 2| Limited|2020-10-01| 1|
| 2| Limited|2020-11-01| 1|
| 2| Limited|2020-12-01| 3|
+----------+---------+----------+-----+
- 在
product_id
上对数据帧进行分区,并在 df3
中添加一个排名列以获得每一行的行号。然后,为每个 product_id
存储 rank
列值的最大值和新列 max_rank
并将 max_rank
存储到 df4
w2 = Window.partitionBy('product_id').orderBy('date')
df3 = df2.withColumn('rank',F.row_number().over(w2))
Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int]
df3.show()
+----------+---------+----------+-----+----+
|product_id| status| date|price|rank|
+----------+---------+----------+-----+----+
| 1|Available|2020-07-01| 5| 1|
| 1|Available|2020-08-01| 8| 2|
| 1|Available|2020-09-01| 8| 3|
| 1|Available|2020-10-01| 8| 4|
| 1|Available|2020-11-01| 8| 5|
| 1| Limited|2020-12-01| 8| 6|
| 2| Limited|2020-09-01| 1| 1|
| 2| Limited|2020-10-01| 1| 2|
| 2| Limited|2020-11-01| 1| 3|
| 2| Limited|2020-12-01| 3| 4|
+----------+---------+----------+-----+----+
df4 = df3.groupBy("product_id").agg(F.max('rank').alias('max_rank'))
Schema: DataFrame[product_id: bigint, max_rank: int]
df4.show()
+----------+--------+
|product_id|max_rank|
+----------+--------+
| 1| 6|
| 2| 4|
+----------+--------+
- 在
product_id
上连接 df3
和 df4
数据帧得到 max_rank
df5 = df3.join(df4,df3.product_id == df4.product_id,"inner") \
.select(df3.product_id,df3.status,df3.date,df3.price,df3.rank,df4.max_rank)
Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int, max_rank: int]
df5.show()
+----------+---------+----------+-----+----+--------+
|product_id| status| date|price|rank|max_rank|
+----------+---------+----------+-----+----+--------+
| 1|Available|2020-07-01| 5| 1| 6|
| 1|Available|2020-08-01| 8| 2| 6|
| 1|Available|2020-09-01| 8| 3| 6|
| 1|Available|2020-10-01| 8| 4| 6|
| 1|Available|2020-11-01| 8| 5| 6|
| 1| Limited|2020-12-01| 8| 6| 6|
| 2| Limited|2020-09-01| 1| 1| 4|
| 2| Limited|2020-10-01| 1| 2| 4|
| 2| Limited|2020-11-01| 1| 3| 4|
| 2| Limited|2020-12-01| 3| 4| 4|
+----------+---------+----------+-----+----+--------+
- 然后最后使用
between
函数过滤 df5
数据帧以获取最新的 6 个月数据。
FinalResultDF = df5.filter(F.col('rank') \
.between(F.when((F.col('max_rank') > 5),(F.col('max_rank')-6)).otherwise(0),F.col('max_rank'))) \
.select(df5.product_id,df5.status,df5.date,df5.price)
FinalResultDF.show(truncate=False)
+----------+---------+----------+-----+
|product_id|status |date |price|
+----------+---------+----------+-----+
|1 |Available|2020-07-01|5 |
|1 |Available|2020-08-01|8 |
|1 |Available|2020-09-01|8 |
|1 |Available|2020-10-01|8 |
|1 |Available|2020-11-01|8 |
|1 |Limited |2020-12-01|8 |
|2 |Limited |2020-09-01|1 |
|2 |Limited |2020-10-01|1 |
|2 |Limited |2020-11-01|1 |
|2 |Limited |2020-12-01|3 |
+----------+---------+----------+-----+
使用火花-sql:
给定输入数据帧:
val df = spark.sql(""" with t1 (
select 1 c1, 'available' c2, 5 c3, '2019-10' c4 union all
select 1 c1, 'available' c2, 8 c3, '2020-08' c4 union all
select 1 c1, 'limited' c2, 8 c3, '2020-10' c4 union all
select 2 c1, 'limited' c2, 1 c3, '2020-09' c4 union all
select 2 c1, 'limited' c2, 3 c3, '2020-10' c4
) select c1 product_id, c2 status , c3 price, c4 month from t1
""")
df.createOrReplaceTempView("df")
df.show(false)
+----------+---------+-----+-------+
|product_id|status |price|month |
+----------+---------+-----+-------+
|1 |available|5 |2019-10|
|1 |available|8 |2020-08|
|1 |limited |8 |2020-10|
|2 |limited |1 |2020-09|
|2 |limited |3 |2020-10|
+----------+---------+-----+-------+
过滤日期window,即从2020-07到2020-12的6个月,并将它们存储在df1
val df1 = spark.sql("""
select * from df where month > '2020-07' and month < '2020-12'
""")
df1.createOrReplaceTempView("df1")
df1.show(false)
+----------+---------+-----+-------+
|product_id|status |price|month |
+----------+---------+-----+-------+
|1 |available|8 |2020-08|
|1 |limited |8 |2020-10|
|2 |limited |1 |2020-09|
|2 |limited |3 |2020-10|
+----------+---------+-----+-------+
下边界-当月份<='2020-07'时获取最大值。将月份覆盖为“2020-07”
val df2 = spark.sql("""
select product_id, status, price, '2020-07' month from df where (product_id,month) in
( select product_id, max(month) from df where month <= '2020-07' group by 1 )
""")
df2.createOrReplaceTempView("df2")
df2.show(false)
+----------+---------+-----+-------+
|product_id|status |price|month |
+----------+---------+-----+-------+
|1 |available|5 |2020-07|
+----------+---------+-----+-------+
上限 - 使用 <='2020-12' 获取最大值。将月份覆盖为“2020-12”
val df3 = spark.sql("""
select product_id, status, price, '2020-12' month from df where (product_id, month) in
( select product_id, max(month) from df where month <= '2020-12' group by 1 )
""")
df3.createOrReplaceTempView("df3")
df3.show(false)
+----------+-------+-----+-------+
|product_id|status |price|month |
+----------+-------+-----+-------+
|1 |limited|8 |2020-12|
|2 |limited|3 |2020-12|
+----------+-------+-----+-------+
现在合并所有 3 并将其存储在 df4
val df4 = spark.sql("""
select product_id, status, price, month from df1 union all
select product_id, status, price, month from df2 union all
select product_id, status, price, month from df3
order by product_id, month
""")
df4.createOrReplaceTempView("df4")
df4.show(false)
+----------+---------+-----+-------+
|product_id|status |price|month |
+----------+---------+-----+-------+
|1 |available|5 |2020-07|
|1 |available|8 |2020-08|
|1 |limited |8 |2020-10|
|1 |limited |8 |2020-12|
|2 |limited |1 |2020-09|
|2 |limited |3 |2020-10|
|2 |limited |3 |2020-12|
+----------+---------+-----+-------+
结果:
使用 sequence(date1,date2, interval 1 month) 为缺失的月份生成日期数组。
分解数组,得到结果。
spark.sql("""
select product_id, status, price, month, explode(dt) res_month from
(
select t1.*,
case when months_between(lm||'-01',month||'-01')=1.0 then array(month||'-01')
when month='2020-12' then array(month||'-01')
else sequence(to_date(month||'-01'), add_months(to_date(lm||'-01'),-1), interval 1 month )
end dt
from (
select product_id, status, price, month,
lead(month) over(partition by product_id order by month) lm
from df4
) t1
) t2
order by product_id, res_month
""")
.show(false)
+----------+---------+-----+-------+----------+
|product_id|status |price|month |res_month |
+----------+---------+-----+-------+----------+
|1 |available|5 |2020-07|2020-07-01|
|1 |available|8 |2020-08|2020-08-01|
|1 |available|8 |2020-08|2020-09-01|
|1 |limited |8 |2020-10|2020-10-01|
|1 |limited |8 |2020-10|2020-11-01|
|1 |limited |8 |2020-12|2020-12-01|
|2 |limited |1 |2020-09|2020-09-01|
|2 |limited |3 |2020-10|2020-10-01|
|2 |limited |3 |2020-10|2020-11-01|
|2 |limited |3 |2020-12|2020-12-01|
+----------+---------+-----+-------+----------+
我有一个 PySpark 数据框,可以跟踪几个月来产品价格和状态发生的变化。这意味着只有当与上个月相比发生变化(状态或价格)时才会创建新行,例如下面的虚拟数据
----------------------------------------
|product_id| status | price| month |
----------------------------------------
|1 | available | 5 | 2019-10|
----------------------------------------
|1 | available | 8 | 2020-08|
----------------------------------------
|1 | limited | 8 | 2020-10|
----------------------------------------
|2 | limited | 1 | 2020-09|
----------------------------------------
|2 | limited | 3 | 2020-10|
----------------------------------------
我想创建一个数据框来显示过去 6 个月中每个月的值。这意味着只要上述数据框中存在间隙,我就需要复制记录。例如,如果最近 6 个月是 2020-07、2020-08、... 2020-12,则上述数据帧的结果应为
----------------------------------------
|product_id| status | price| month |
----------------------------------------
|1 | available | 5 | 2020-07|
----------------------------------------
|1 | available | 8 | 2020-08|
----------------------------------------
|1 | available | 8 | 2020-09|
----------------------------------------
|1 | limited | 8 | 2020-10|
----------------------------------------
|1 | limited | 8 | 2020-11|
----------------------------------------
|1 | limited | 8 | 2020-12|
----------------------------------------
|2 | limited | 1 | 2020-09|
----------------------------------------
|2 | limited | 3 | 2020-10|
----------------------------------------
|2 | limited | 3 | 2020-11|
----------------------------------------
|2 | limited | 3 | 2020-12|
----------------------------------------
请注意,对于 product_id = 1,有一条 2019-10 年的较旧记录传播到 2020-08 年,然后被修剪,而对于 product_id = 2,之前没有记录2020-09,因此 2020-07、2020-08 月份没有填充(因为该产品在 2020-09 之前不存在)。
由于数据框由数百万条记录组成,因此使用 for 循环并检查每个 product_id 的“强力”解决方案相当慢。似乎应该可以使用 window 函数解决这个问题,方法是创建另一列 next_month,然后根据该列填补空白,但我不知道如何实现。
关于 @jxc 的评论,我已经为这个用例准备了答案。
以下是代码片段。
导入 spark SQL 函数
from pyspark.sql import functions as F, Window
准备样本数据
simpleData = ((1,"Available",5,"2020-07"),
(1,"Available",8,"2020-08"),
(1,"Limited",8,"2020-12"),
(2,"Limited",1,"2020-09"),
(2,"Limited",3,"2020-12")
)
columns= ["product_id", "status", "price", "month"]
正在创建示例数据的数据框
df = spark.createDataFrame(data = simpleData, schema = columns)
在数据框中添加日期列以获得正确格式的日期
df0 = df.withColumn("date",F.to_date('month','yyyy-MM'))
df0.show()
+----------+---------+-----+-------+----------+
|product_id| status|price| month| date|
+----------+---------+-----+-------+----------+
| 1|Available| 5|2020-07|2020-07-01|
| 1|Available| 8|2020-08|2020-08-01|
| 1| Limited| 8|2020-12|2020-12-01|
| 2| Limited| 1|2020-09|2020-09-01|
| 2| Limited| 3|2020-12|2020-12-01|
+----------+---------+-----+-------+----------+
- 创建 WinSpec w1 并使用 Window 聚合函数导致找到下一个日期 (w1),将其转换为前几个月以设置日期序列:
w1 = Window.partitionBy('product_id').orderBy('date')
df1 = df0.withColumn('end_date',F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
df1.show()
+----------+---------+-----+-------+----------+----------+
|product_id| status|price| month| date| end_date|
+----------+---------+-----+-------+----------+----------+
| 1|Available| 5|2020-07|2020-07-01|2020-07-01|
| 1|Available| 8|2020-08|2020-08-01|2020-11-01|
| 1| Limited| 8|2020-12|2020-12-01|2020-12-01|
| 2| Limited| 1|2020-09|2020-09-01|2020-11-01|
| 2| Limited| 3|2020-12|2020-12-01|2020-12-01|
+----------+---------+-----+-------+----------+----------+
- 使用months_between(end_date, date)计算两个日期之间的月数,并使用transform函数迭代sequence(0, #months),创建一个named_struct 使用 date=add_months(date,i) 和 price=IF(i=0,price,price),使用 inline_outer 分解结构数组。
df2 = df1.selectExpr("product_id", "status", inline_outer( transform( sequence(0,int(months_between(end_date, date)),1), i -> (add_months(date,i) as date, IF(i=0,price,price) as price) ) ) )
df2.show()
+----------+---------+----------+-----+
|product_id| status| date|price|
+----------+---------+----------+-----+
| 1|Available|2020-07-01| 5|
| 1|Available|2020-08-01| 8|
| 1|Available|2020-09-01| 8|
| 1|Available|2020-10-01| 8|
| 1|Available|2020-11-01| 8|
| 1| Limited|2020-12-01| 8|
| 2| Limited|2020-09-01| 1|
| 2| Limited|2020-10-01| 1|
| 2| Limited|2020-11-01| 1|
| 2| Limited|2020-12-01| 3|
+----------+---------+----------+-----+
- 在
product_id
上对数据帧进行分区,并在df3
中添加一个排名列以获得每一行的行号。然后,为每个product_id
存储rank
列值的最大值和新列max_rank
并将max_rank
存储到df4
w2 = Window.partitionBy('product_id').orderBy('date')
df3 = df2.withColumn('rank',F.row_number().over(w2))
Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int]
df3.show()
+----------+---------+----------+-----+----+
|product_id| status| date|price|rank|
+----------+---------+----------+-----+----+
| 1|Available|2020-07-01| 5| 1|
| 1|Available|2020-08-01| 8| 2|
| 1|Available|2020-09-01| 8| 3|
| 1|Available|2020-10-01| 8| 4|
| 1|Available|2020-11-01| 8| 5|
| 1| Limited|2020-12-01| 8| 6|
| 2| Limited|2020-09-01| 1| 1|
| 2| Limited|2020-10-01| 1| 2|
| 2| Limited|2020-11-01| 1| 3|
| 2| Limited|2020-12-01| 3| 4|
+----------+---------+----------+-----+----+
df4 = df3.groupBy("product_id").agg(F.max('rank').alias('max_rank'))
Schema: DataFrame[product_id: bigint, max_rank: int]
df4.show()
+----------+--------+
|product_id|max_rank|
+----------+--------+
| 1| 6|
| 2| 4|
+----------+--------+
- 在
product_id
上连接df3
和df4
数据帧得到max_rank
df5 = df3.join(df4,df3.product_id == df4.product_id,"inner") \
.select(df3.product_id,df3.status,df3.date,df3.price,df3.rank,df4.max_rank)
Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int, max_rank: int]
df5.show()
+----------+---------+----------+-----+----+--------+
|product_id| status| date|price|rank|max_rank|
+----------+---------+----------+-----+----+--------+
| 1|Available|2020-07-01| 5| 1| 6|
| 1|Available|2020-08-01| 8| 2| 6|
| 1|Available|2020-09-01| 8| 3| 6|
| 1|Available|2020-10-01| 8| 4| 6|
| 1|Available|2020-11-01| 8| 5| 6|
| 1| Limited|2020-12-01| 8| 6| 6|
| 2| Limited|2020-09-01| 1| 1| 4|
| 2| Limited|2020-10-01| 1| 2| 4|
| 2| Limited|2020-11-01| 1| 3| 4|
| 2| Limited|2020-12-01| 3| 4| 4|
+----------+---------+----------+-----+----+--------+
- 然后最后使用
between
函数过滤df5
数据帧以获取最新的 6 个月数据。
FinalResultDF = df5.filter(F.col('rank') \
.between(F.when((F.col('max_rank') > 5),(F.col('max_rank')-6)).otherwise(0),F.col('max_rank'))) \
.select(df5.product_id,df5.status,df5.date,df5.price)
FinalResultDF.show(truncate=False)
+----------+---------+----------+-----+
|product_id|status |date |price|
+----------+---------+----------+-----+
|1 |Available|2020-07-01|5 |
|1 |Available|2020-08-01|8 |
|1 |Available|2020-09-01|8 |
|1 |Available|2020-10-01|8 |
|1 |Available|2020-11-01|8 |
|1 |Limited |2020-12-01|8 |
|2 |Limited |2020-09-01|1 |
|2 |Limited |2020-10-01|1 |
|2 |Limited |2020-11-01|1 |
|2 |Limited |2020-12-01|3 |
+----------+---------+----------+-----+
使用火花-sql:
给定输入数据帧:
val df = spark.sql(""" with t1 (
select 1 c1, 'available' c2, 5 c3, '2019-10' c4 union all
select 1 c1, 'available' c2, 8 c3, '2020-08' c4 union all
select 1 c1, 'limited' c2, 8 c3, '2020-10' c4 union all
select 2 c1, 'limited' c2, 1 c3, '2020-09' c4 union all
select 2 c1, 'limited' c2, 3 c3, '2020-10' c4
) select c1 product_id, c2 status , c3 price, c4 month from t1
""")
df.createOrReplaceTempView("df")
df.show(false)
+----------+---------+-----+-------+
|product_id|status |price|month |
+----------+---------+-----+-------+
|1 |available|5 |2019-10|
|1 |available|8 |2020-08|
|1 |limited |8 |2020-10|
|2 |limited |1 |2020-09|
|2 |limited |3 |2020-10|
+----------+---------+-----+-------+
过滤日期window,即从2020-07到2020-12的6个月,并将它们存储在df1
val df1 = spark.sql("""
select * from df where month > '2020-07' and month < '2020-12'
""")
df1.createOrReplaceTempView("df1")
df1.show(false)
+----------+---------+-----+-------+
|product_id|status |price|month |
+----------+---------+-----+-------+
|1 |available|8 |2020-08|
|1 |limited |8 |2020-10|
|2 |limited |1 |2020-09|
|2 |limited |3 |2020-10|
+----------+---------+-----+-------+
下边界-当月份<='2020-07'时获取最大值。将月份覆盖为“2020-07”
val df2 = spark.sql("""
select product_id, status, price, '2020-07' month from df where (product_id,month) in
( select product_id, max(month) from df where month <= '2020-07' group by 1 )
""")
df2.createOrReplaceTempView("df2")
df2.show(false)
+----------+---------+-----+-------+
|product_id|status |price|month |
+----------+---------+-----+-------+
|1 |available|5 |2020-07|
+----------+---------+-----+-------+
上限 - 使用 <='2020-12' 获取最大值。将月份覆盖为“2020-12”
val df3 = spark.sql("""
select product_id, status, price, '2020-12' month from df where (product_id, month) in
( select product_id, max(month) from df where month <= '2020-12' group by 1 )
""")
df3.createOrReplaceTempView("df3")
df3.show(false)
+----------+-------+-----+-------+
|product_id|status |price|month |
+----------+-------+-----+-------+
|1 |limited|8 |2020-12|
|2 |limited|3 |2020-12|
+----------+-------+-----+-------+
现在合并所有 3 并将其存储在 df4
val df4 = spark.sql("""
select product_id, status, price, month from df1 union all
select product_id, status, price, month from df2 union all
select product_id, status, price, month from df3
order by product_id, month
""")
df4.createOrReplaceTempView("df4")
df4.show(false)
+----------+---------+-----+-------+
|product_id|status |price|month |
+----------+---------+-----+-------+
|1 |available|5 |2020-07|
|1 |available|8 |2020-08|
|1 |limited |8 |2020-10|
|1 |limited |8 |2020-12|
|2 |limited |1 |2020-09|
|2 |limited |3 |2020-10|
|2 |limited |3 |2020-12|
+----------+---------+-----+-------+
结果: 使用 sequence(date1,date2, interval 1 month) 为缺失的月份生成日期数组。 分解数组,得到结果。
spark.sql("""
select product_id, status, price, month, explode(dt) res_month from
(
select t1.*,
case when months_between(lm||'-01',month||'-01')=1.0 then array(month||'-01')
when month='2020-12' then array(month||'-01')
else sequence(to_date(month||'-01'), add_months(to_date(lm||'-01'),-1), interval 1 month )
end dt
from (
select product_id, status, price, month,
lead(month) over(partition by product_id order by month) lm
from df4
) t1
) t2
order by product_id, res_month
""")
.show(false)
+----------+---------+-----+-------+----------+
|product_id|status |price|month |res_month |
+----------+---------+-----+-------+----------+
|1 |available|5 |2020-07|2020-07-01|
|1 |available|8 |2020-08|2020-08-01|
|1 |available|8 |2020-08|2020-09-01|
|1 |limited |8 |2020-10|2020-10-01|
|1 |limited |8 |2020-10|2020-11-01|
|1 |limited |8 |2020-12|2020-12-01|
|2 |limited |1 |2020-09|2020-09-01|
|2 |limited |3 |2020-10|2020-10-01|
|2 |limited |3 |2020-10|2020-11-01|
|2 |limited |3 |2020-12|2020-12-01|
+----------+---------+-----+-------+----------+