在分组时将字符串与另一列 pyspark 连接起来

Concatenate string on grouping with the other column pyspark

我的数据框如下

+----------+--------------------+
|CustomerNo| desc               |
+----------+--------------------+
|  351856.0| FORM & BEAUTY 075 P|
|  351856.0| FORM & BEAUTY 075 P|
|  326022.0|            D 151 HP|
|   69430.0|Shape Sensation 0...|
|   38018.0|   Maximizer 846 WHU|
|   69712.0|Shape Sensation 0...|
|   71228.0|   Aqua Festive WHUD|
|   71228.0|Maximizer 736 WHU...|
|   73200.0|  T-Shirt Bra 081 HP|
|   73200.0|  T-Shirt Bra 081 HP|
|   73200.0|  T-Shirt Bra 081 HP|
|   74540.0|Form & Beauty 052 HP|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
+----------+--------------------+

我需要按 CustomerNo 上的数据分组并连接字符串列。

我正在使用以下代码,但出现错误

df = retail_df.groupBy('CustomerNo').agg(F.concat('desc').alias('concat_Desc'))

谁能告诉我怎么做?

不太清楚你想要的输出是什么,但如果我理解正确的话,你会喜欢下面的解决方案,它使用 collect_list 将所有项目分组到一个数组中,并且 udf 将此类数组的元素连接成一个字符串:

import pyspark.sql.functions as F

@F.udf('string')
def concat_into_string(l):
  return ' - '.join(l)

df = retail_df \
  .groupBy('CustomerNo').agg(F.collect_list('desc').alias('desc')) \
  .withColumn('final_string', concat_into_string('desc'))

示例

df = spark.createDataFrame([
  (1, 'A'),
  (1, 'A'),
  (2, 'B'),
  (3, 'C1'),
  (3, 'C2'),
  (4, 'D'),
  (4, 'D'),
  (4, 'D'),
  (4, 'D')
], ('CustomerNo', 'desc'))

df \
  .groupBy('CustomerNo').agg(F.collect_list('desc').alias('desc')) \
  .withColumn('final_string', concat_into_string('desc')) \
  .show()

+----------+------------+-------------+
|CustomerNo|        desc| final_string|
+----------+------------+-------------+
|         1|      [A, A]|        A - A|
|         2|         [B]|            B|
|         3|    [C1, C2]|      C1 - C2|
|         4|[D, D, D, D]|D - D - D - D|
+----------+------------+-------------+

您可以对 CustomerNo 上的数据框进行分组,然后进行收集列表。接下来,您可以使用 concat_ws

连接单列列表的项目

看下面的代码,

retail_df \
    .groupBy('CustomerNo') \
    .agg(F.collect_list('desc').alias('items')) \
    .withColumn("concat_Desc", F.concat_ws(",", "items"))

此解决方案未使用 udf,因此在性能方面会更好。

import pyspark.sql.functions as F
df = spark.read.option("inferschema","true").option("header","true").csv("/FileStore/tables/test.csv")
print("Sample Data")
df.select("eid","ename").show()
print("")
print("")
print("Final Ouput")
df.select("eid","ename").groupBy("eid").agg(F.concat_ws(", ", F.collect_list("ename")).alias("desc")).show()

Sample Data
+---+-----+
|eid|ename|
+---+-----+
|  1|    a|
|  1|    b|
|  2|    c|
|  2|    d|
|  3|    e|
|  4|    f|
|  4|    g|
+---+-----+



Final Ouput
+---+----+
|eid|desc|
+---+----+
|  1|a, b|
|  3|   e|
|  4|f, g|
|  2|c, d|
+---+----+