在 Pyspark Dataframe (Spark v1.6) 中的一组记录中执行扫描操作
Perform a scan operation in a group of records in Pyspark Dataframe (Spark v1.6)
专家,我有一个要求,在一组记录中,我需要对 Pyspark 数据帧执行 "scan & Act" 操作。这是我的 pyspark 数据框(Spark 版本 1.6),有 2 个字段-
col1 effective_date
A 2019-02-01
A 2019-02-03
A 2019-02-05
A 2019-02-07
期望的结果是-
col1 effective_date expiry_date
A 2019-02-01 2019-02-02
A 2019-02-03 2019-02-04
A 2019-02-05 2019-02-06
A 2019-02-07 2999-12-31
这是典型的 SCD2(缓慢变化维度)操作,我的记录应按 effective_date(升序)排序,对于 "group of" col1 值,
每条记录都应具有下一条记录 (effective_date-1) 的 expiry_date,组中最大 effective_date 的最后一条记录应假定 expiry_date 为“2999-12 -31"
因为它是最新的记录并且它还没有未来的实例,但是如果将来确实出现了某些东西,它应该会过期。
例如-
col1 effective_date expiry_date
A 2019-02-01 2019-02-02
A 2019-02-03 2019-02-04
A 2019-02-05 2019-02-06
A 2019-02-07 2019-02-08
A 2019-02-09 2999-12-31
你要用的是一个Window函数
from pyspark.sql import Window
from pyspark.sql import functions as F
w = Window.partitionBy("col1").orderBy("effective_date")
df = df.withColumn("expiry_date", F.date_sub(F.lead("effective_date").over(w), 1))
专家,我有一个要求,在一组记录中,我需要对 Pyspark 数据帧执行 "scan & Act" 操作。这是我的 pyspark 数据框(Spark 版本 1.6),有 2 个字段-
col1 effective_date
A 2019-02-01
A 2019-02-03
A 2019-02-05
A 2019-02-07
期望的结果是-
col1 effective_date expiry_date
A 2019-02-01 2019-02-02
A 2019-02-03 2019-02-04
A 2019-02-05 2019-02-06
A 2019-02-07 2999-12-31
这是典型的 SCD2(缓慢变化维度)操作,我的记录应按 effective_date(升序)排序,对于 "group of" col1 值, 每条记录都应具有下一条记录 (effective_date-1) 的 expiry_date,组中最大 effective_date 的最后一条记录应假定 expiry_date 为“2999-12 -31" 因为它是最新的记录并且它还没有未来的实例,但是如果将来确实出现了某些东西,它应该会过期。 例如-
col1 effective_date expiry_date
A 2019-02-01 2019-02-02
A 2019-02-03 2019-02-04
A 2019-02-05 2019-02-06
A 2019-02-07 2019-02-08
A 2019-02-09 2999-12-31
你要用的是一个Window函数
from pyspark.sql import Window
from pyspark.sql import functions as F
w = Window.partitionBy("col1").orderBy("effective_date")
df = df.withColumn("expiry_date", F.date_sub(F.lead("effective_date").over(w), 1))