在分组时将字符串与另一列 pyspark 连接起来
Concatenate string on grouping with the other column pyspark
我的数据框如下
+----------+--------------------+
|CustomerNo| desc |
+----------+--------------------+
| 351856.0| FORM & BEAUTY 075 P|
| 351856.0| FORM & BEAUTY 075 P|
| 326022.0| D 151 HP|
| 69430.0|Shape Sensation 0...|
| 38018.0| Maximizer 846 WHU|
| 69712.0|Shape Sensation 0...|
| 71228.0| Aqua Festive WHUD|
| 71228.0|Maximizer 736 WHU...|
| 73200.0| T-Shirt Bra 081 HP|
| 73200.0| T-Shirt Bra 081 HP|
| 73200.0| T-Shirt Bra 081 HP|
| 74540.0|Form & Beauty 052 HP|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
+----------+--------------------+
我需要按 CustomerNo
上的数据分组并连接字符串列。
我正在使用以下代码,但出现错误
df = retail_df.groupBy('CustomerNo').agg(F.concat('desc').alias('concat_Desc'))
谁能告诉我怎么做?
不太清楚你想要的输出是什么,但如果我理解正确的话,你会喜欢下面的解决方案,它使用 collect_list
将所有项目分组到一个数组中,并且 udf
将此类数组的元素连接成一个字符串:
import pyspark.sql.functions as F
@F.udf('string')
def concat_into_string(l):
return ' - '.join(l)
df = retail_df \
.groupBy('CustomerNo').agg(F.collect_list('desc').alias('desc')) \
.withColumn('final_string', concat_into_string('desc'))
示例
df = spark.createDataFrame([
(1, 'A'),
(1, 'A'),
(2, 'B'),
(3, 'C1'),
(3, 'C2'),
(4, 'D'),
(4, 'D'),
(4, 'D'),
(4, 'D')
], ('CustomerNo', 'desc'))
df \
.groupBy('CustomerNo').agg(F.collect_list('desc').alias('desc')) \
.withColumn('final_string', concat_into_string('desc')) \
.show()
+----------+------------+-------------+
|CustomerNo| desc| final_string|
+----------+------------+-------------+
| 1| [A, A]| A - A|
| 2| [B]| B|
| 3| [C1, C2]| C1 - C2|
| 4|[D, D, D, D]|D - D - D - D|
+----------+------------+-------------+
您可以对 CustomerNo 上的数据框进行分组,然后进行收集列表。接下来,您可以使用 concat_ws
连接单列列表的项目
看下面的代码,
retail_df \
.groupBy('CustomerNo') \
.agg(F.collect_list('desc').alias('items')) \
.withColumn("concat_Desc", F.concat_ws(",", "items"))
此解决方案未使用 udf,因此在性能方面会更好。
import pyspark.sql.functions as F
df = spark.read.option("inferschema","true").option("header","true").csv("/FileStore/tables/test.csv")
print("Sample Data")
df.select("eid","ename").show()
print("")
print("")
print("Final Ouput")
df.select("eid","ename").groupBy("eid").agg(F.concat_ws(", ", F.collect_list("ename")).alias("desc")).show()
Sample Data
+---+-----+
|eid|ename|
+---+-----+
| 1| a|
| 1| b|
| 2| c|
| 2| d|
| 3| e|
| 4| f|
| 4| g|
+---+-----+
Final Ouput
+---+----+
|eid|desc|
+---+----+
| 1|a, b|
| 3| e|
| 4|f, g|
| 2|c, d|
+---+----+
我的数据框如下
+----------+--------------------+
|CustomerNo| desc |
+----------+--------------------+
| 351856.0| FORM & BEAUTY 075 P|
| 351856.0| FORM & BEAUTY 075 P|
| 326022.0| D 151 HP|
| 69430.0|Shape Sensation 0...|
| 38018.0| Maximizer 846 WHU|
| 69712.0|Shape Sensation 0...|
| 71228.0| Aqua Festive WHUD|
| 71228.0|Maximizer 736 WHU...|
| 73200.0| T-Shirt Bra 081 HP|
| 73200.0| T-Shirt Bra 081 HP|
| 73200.0| T-Shirt Bra 081 HP|
| 74540.0|Form & Beauty 052 HP|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
| 74578.0| G 56 WP 01|
+----------+--------------------+
我需要按 CustomerNo
上的数据分组并连接字符串列。
我正在使用以下代码,但出现错误
df = retail_df.groupBy('CustomerNo').agg(F.concat('desc').alias('concat_Desc'))
谁能告诉我怎么做?
不太清楚你想要的输出是什么,但如果我理解正确的话,你会喜欢下面的解决方案,它使用 collect_list
将所有项目分组到一个数组中,并且 udf
将此类数组的元素连接成一个字符串:
import pyspark.sql.functions as F
@F.udf('string')
def concat_into_string(l):
return ' - '.join(l)
df = retail_df \
.groupBy('CustomerNo').agg(F.collect_list('desc').alias('desc')) \
.withColumn('final_string', concat_into_string('desc'))
示例
df = spark.createDataFrame([
(1, 'A'),
(1, 'A'),
(2, 'B'),
(3, 'C1'),
(3, 'C2'),
(4, 'D'),
(4, 'D'),
(4, 'D'),
(4, 'D')
], ('CustomerNo', 'desc'))
df \
.groupBy('CustomerNo').agg(F.collect_list('desc').alias('desc')) \
.withColumn('final_string', concat_into_string('desc')) \
.show()
+----------+------------+-------------+
|CustomerNo| desc| final_string|
+----------+------------+-------------+
| 1| [A, A]| A - A|
| 2| [B]| B|
| 3| [C1, C2]| C1 - C2|
| 4|[D, D, D, D]|D - D - D - D|
+----------+------------+-------------+
您可以对 CustomerNo 上的数据框进行分组,然后进行收集列表。接下来,您可以使用 concat_ws
连接单列列表的项目看下面的代码,
retail_df \
.groupBy('CustomerNo') \
.agg(F.collect_list('desc').alias('items')) \
.withColumn("concat_Desc", F.concat_ws(",", "items"))
此解决方案未使用 udf,因此在性能方面会更好。
import pyspark.sql.functions as F
df = spark.read.option("inferschema","true").option("header","true").csv("/FileStore/tables/test.csv")
print("Sample Data")
df.select("eid","ename").show()
print("")
print("")
print("Final Ouput")
df.select("eid","ename").groupBy("eid").agg(F.concat_ws(", ", F.collect_list("ename")).alias("desc")).show()
Sample Data
+---+-----+
|eid|ename|
+---+-----+
| 1| a|
| 1| b|
| 2| c|
| 2| d|
| 3| e|
| 4| f|
| 4| g|
+---+-----+
Final Ouput
+---+----+
|eid|desc|
+---+----+
| 1|a, b|
| 3| e|
| 4|f, g|
| 2|c, d|
+---+----+