Pyspark:根据另一列生成一列,该列重复将值附加到当前行
Pyspark: Generate a column based on another column that has repeatedly appended values upto the current row
这是我的数据框:
我想为 Product 列中的每个产品生成列 B。
我尝试使用 pyspark 的 Lead/Lag 函数,但无法准确生成它。
使用F.collect_list
和explode
函数。
F.collect_list
over a window 将累积附加列 A
中的列表。
explode
和 collect_list
可以将列表的列表合并为一个。
你的 df:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
schema = StructType([StructField("product", StringType()), StructField("year", IntegerType()), \
StructField("A", ArrayType(IntegerType()))])
data = [['A', 2010, [1,2,3]], ['A', 2011, [4,5,6]], ['A', 2012, [7,8,]], \
['B', 2009, [10,11,12]], ['B', 2010, [16,17]], ['B', 2011, [20,21,22,23]], ['B', 2012, [24]]]
df = spark.createDataFrame(data,schema=schema)
w = Window.partitionBy("product").orderBy("year")
df.withColumn("first_list", F.collect_list("A").over(w))\
.withColumn("first_explode", F.explode((F.col("first_list"))))\
.withColumn("second_explode", F.explode(F.col("first_explode")))\
.withColumn("cum_list", F.collect_list("second_explode").over(Window.partitionBy("product", "year")))\
.drop("first_list", "first_explode", "second_explode").distinct()\
.orderBy("product", "year").show(truncate=False)
(如果您不想重复,请在 second_explode
上使用 F.collect_set
)
输出:
+-------+----+----------------+----------------------------------------+
|product|year|A |cum_list |
+-------+----+----------------+----------------------------------------+
|A |2010|[1, 2, 3] |[1, 2, 3] |
|A |2011|[4, 5, 6] |[1, 2, 3, 4, 5, 6] |
|A |2012|[7, 8] |[1, 2, 3, 4, 5, 6, 7, 8] |
|B |2009|[10, 11, 12] |[10, 11, 12] |
|B |2010|[16, 17] |[10, 11, 12, 16, 17] |
|B |2011|[20, 21, 22, 23]|[10, 11, 12, 16, 17, 20, 21, 22, 23] |
|B |2012|[24] |[10, 11, 12, 16, 17, 20, 21, 22, 23, 24]|
+-------+----+----------------+----------------------------------------+
这是我的数据框:
我想为 Product 列中的每个产品生成列 B。
我尝试使用 pyspark 的 Lead/Lag 函数,但无法准确生成它。
使用F.collect_list
和explode
函数。
F.collect_list
over a window 将累积附加列 A
中的列表。
explode
和 collect_list
可以将列表的列表合并为一个。
你的 df:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
schema = StructType([StructField("product", StringType()), StructField("year", IntegerType()), \
StructField("A", ArrayType(IntegerType()))])
data = [['A', 2010, [1,2,3]], ['A', 2011, [4,5,6]], ['A', 2012, [7,8,]], \
['B', 2009, [10,11,12]], ['B', 2010, [16,17]], ['B', 2011, [20,21,22,23]], ['B', 2012, [24]]]
df = spark.createDataFrame(data,schema=schema)
w = Window.partitionBy("product").orderBy("year")
df.withColumn("first_list", F.collect_list("A").over(w))\
.withColumn("first_explode", F.explode((F.col("first_list"))))\
.withColumn("second_explode", F.explode(F.col("first_explode")))\
.withColumn("cum_list", F.collect_list("second_explode").over(Window.partitionBy("product", "year")))\
.drop("first_list", "first_explode", "second_explode").distinct()\
.orderBy("product", "year").show(truncate=False)
(如果您不想重复,请在 second_explode
上使用 F.collect_set
)
输出:
+-------+----+----------------+----------------------------------------+
|product|year|A |cum_list |
+-------+----+----------------+----------------------------------------+
|A |2010|[1, 2, 3] |[1, 2, 3] |
|A |2011|[4, 5, 6] |[1, 2, 3, 4, 5, 6] |
|A |2012|[7, 8] |[1, 2, 3, 4, 5, 6, 7, 8] |
|B |2009|[10, 11, 12] |[10, 11, 12] |
|B |2010|[16, 17] |[10, 11, 12, 16, 17] |
|B |2011|[20, 21, 22, 23]|[10, 11, 12, 16, 17, 20, 21, 22, 23] |
|B |2012|[24] |[10, 11, 12, 16, 17, 20, 21, 22, 23, 24]|
+-------+----+----------------+----------------------------------------+