如何合并多行删除重复项并连接其他列值

How to merge multiple rows removing duplicates and concatenate other column values

我有以下包含 4 列(A、B、C、D)的数据框:

df=

A B C D
123 001 ABC DEF
123 001 ABC DEG
256 002 XXY DSA
256 002 XXX DSA

从上面的数据框中,我想按 A 列和 B 列分组,并用分号“;”聚合和连接按 C 和 D 列,同时确保不连接重复值。以下是预期结果。

df_agg =

A B C D
123 001 ABC DEF;DEG;
256 002 XXY;XXX; DSA

我目前有下面的 groupby 函数,但它似乎没有删除重复的字符串,而是连接整个字符串。

df_agg = df.groupby(groupby_list).agg( *[array_join(collect_list(column), ";").alias(column) for column in df.columns if column not in groupby_list] )

此外,我如何确保添加';'循环结束后?

您需要使用 collect_set 进行聚合 - 它会删除重复项。 array_join 在元素之间添加分隔符。 concat 将结果与 ; 或末尾的空字符串连接起来。

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(123, '001', 'ABC', 'DEF'),
     (123, '001', 'ABC', 'DEG'),
     (256, '002', 'XXY', 'DSA'),
     (256, '002', 'XXX', 'DSA')],
    ['A', 'B', 'C', 'D']
)

df_agg = df.groupBy('A', 'B').agg(
    *[F.concat(F.array_join(F.collect_set(c), ';'),
             F.when(F.size(F.collect_set(c)) > 1, ';').otherwise('')).alias(c)
     for c in {'C', 'D'}]
)
df_agg.show()
# +---+---+--------+--------+
# |  A|  B|       C|       D|
# +---+---+--------+--------+
# |123|001|     ABC|DEF;DEG;|
# |256|002|XXX;XXY;|     DSA|
# +---+---+--------+--------+

groupbycollect_set,集合只取唯一值。使用 array_join 函数将数组元素与所需的分隔符连接起来。下面的代码

df.groupby('A','B').agg(*[array_join(collect_set(x),';').alias(x) for x in df.drop('A','B').columns]).show(truncate=0)

+---+---+-------+-------+
|A  |B  |C      |D      |
+---+---+-------+-------+
|123|001|ABC    |DEF;DEG|
|256|002|XXX;XXY|DSA    |
+---+---+-------+-------+