
Pyspark: Generate a column based on another column that has repeatedly appended values upto the current row


我想为 Product 列中的每个产品生成列 B

我尝试使用 pyspark 的 Lead/Lag 函数,但无法准确生成它。


F.collect_list over a window 将累积附加列 A 中的列表。
explodecollect_list 可以将列表的列表合并为一个。

你的 df:

from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *

schema = StructType([StructField("product", StringType()), StructField("year", IntegerType()), \
    StructField("A", ArrayType(IntegerType()))])

data = [['A', 2010, [1,2,3]], ['A', 2011, [4,5,6]], ['A', 2012, [7,8,]], \
    ['B', 2009, [10,11,12]], ['B', 2010, [16,17]], ['B', 2011, [20,21,22,23]], ['B', 2012, [24]]]

df = spark.createDataFrame(data,schema=schema)
w = Window.partitionBy("product").orderBy("year")

df.withColumn("first_list", F.collect_list("A").over(w))\
    .withColumn("first_explode", F.explode((F.col("first_list"))))\
    .withColumn("second_explode", F.explode(F.col("first_explode")))\
    .withColumn("cum_list", F.collect_list("second_explode").over(Window.partitionBy("product", "year")))\
    .drop("first_list", "first_explode", "second_explode").distinct()\
    .orderBy("product", "year").show(truncate=False)

(如果您不想重复,请在 second_explode 上使用 F.collect_set


|product|year|A               |cum_list                                |
|A      |2010|[1, 2, 3]       |[1, 2, 3]                               |
|A      |2011|[4, 5, 6]       |[1, 2, 3, 4, 5, 6]                      |
|A      |2012|[7, 8]          |[1, 2, 3, 4, 5, 6, 7, 8]                |
|B      |2009|[10, 11, 12]    |[10, 11, 12]                            |
|B      |2010|[16, 17]        |[10, 11, 12, 16, 17]                    |
|B      |2011|[20, 21, 22, 23]|[10, 11, 12, 16, 17, 20, 21, 22, 23]    |
|B      |2012|[24]            |[10, 11, 12, 16, 17, 20, 21, 22, 23, 24]|