在 Pyspark Dataframe (Spark v1.6) 中的一组记录中执行扫描操作

Perform a scan operation in a group of records in Pyspark Dataframe (Spark v1.6)

专家,我有一个要求,在一组记录中,我需要对 Pyspark 数据帧执行 "scan & Act" 操作。这是我的 pyspark 数据框(Spark 版本 1.6),有 2 个字段-

col1     effective_date
A        2019-02-01
A        2019-02-03
A        2019-02-05
A        2019-02-07

期望的结果是-

col1     effective_date expiry_date
A        2019-02-01     2019-02-02
A        2019-02-03     2019-02-04
A        2019-02-05     2019-02-06
A        2019-02-07     2999-12-31

这是典型的 SCD2(缓慢变化维度)操作,我的记录应按 effective_date(升序)排序,对于 "group of" col1 值, 每条记录都应具有下一条记录 (effective_date-1) 的 expiry_date,组中最大 effective_date 的最后一条记录应假定 expiry_date 为“2999-12 -31" 因为它是最新的记录并且它还没有未来的实例,但是如果将来确实出现了某些东西,它应该会过期。 例如-

col1     effective_date expiry_date
A        2019-02-01     2019-02-02
A        2019-02-03     2019-02-04
A        2019-02-05     2019-02-06
A        2019-02-07     2019-02-08
A        2019-02-09     2999-12-31

你要用的是一个Window函数

from pyspark.sql import Window
from pyspark.sql import functions as F

w = Window.partitionBy("col1").orderBy("effective_date")
df = df.withColumn("expiry_date", F.date_sub(F.lead("effective_date").over(w), 1))