在 pyspark 中合并来自多行的文本
combine text from multiple rows in pyspark
我使用以下代码创建了一个 PySpark 数据框
testlist = [
{"category":"A","name":"A1"},
{"category":"A","name":"A2"},
{"category":"B","name":"B1"},
{"category":"B","name":"B2"}
]
spark_df = spark.createDataFrame(testlist)
结果:
category name
A A1
A A2
B B1
B B2
我想让它显示如下:
category name
A A1, A2
B B1, B2
我试过下面的代码,但不起作用
spark_df.groupby('category').agg('name', lambda x:x + ', ')
任何人都可以帮助确定我做错了什么以及实现这一目标的最佳方法吗?
一种选择是使用 pyspark.sql.functions.collect_list()
作为聚合函数。
from pyspark.sql.functions import collect_list
grouped_df = spark_df.groupby('category').agg(collect_list('name').alias("name"))
这会将 name
的值收集到列表中,结果输出如下所示:
grouped_df.show()
#+---------+---------+
#|category |name |
#+---------+---------+
#|A |[A1, A2] |
#|B |[B1, B2] |
#+---------+---------+
更新 2019-06-10:
如果您希望将输出作为连接字符串,则可以使用 pyspark.sql.functions.concat_ws
to concatenate the values of the collected list, which will be :
from pyspark.sql.functions import concat_ws
grouped_df.withColumn("name", concat_ws(", ", "name")).show()
#+---------+-------+
#|category |name |
#+---------+-------+
#|A |A1, A2 |
#|B |B1, B2 |
#+---------+-------+
原始答案:如果您希望将输出作为连接字符串,您必须 可以 使用一个 udf
。例如,您可以先执行上述 groupBy()
,然后应用 udf
加入收集列表:
from pyspark.sql.functions import udf
concat_list = udf(lambda lst: ", ".join(lst), StringType())
grouped_df.withColumn("name", concat_list("name")).show()
#+---------+-------+
#|category |name |
#+---------+-------+
#|A |A1, A2 |
#|B |B1, B2 |
#+---------+-------+
另一个选项是这个
>>> df.rdd.reduceByKey(lambda x,y: x+','+y).toDF().show()
+---+-----+
| _1| _2|
+---+-----+
| A|A1,A2|
| B|B1,B2|
+---+-----+
我使用以下代码创建了一个 PySpark 数据框
testlist = [
{"category":"A","name":"A1"},
{"category":"A","name":"A2"},
{"category":"B","name":"B1"},
{"category":"B","name":"B2"}
]
spark_df = spark.createDataFrame(testlist)
结果:
category name
A A1
A A2
B B1
B B2
我想让它显示如下:
category name
A A1, A2
B B1, B2
我试过下面的代码,但不起作用
spark_df.groupby('category').agg('name', lambda x:x + ', ')
任何人都可以帮助确定我做错了什么以及实现这一目标的最佳方法吗?
一种选择是使用 pyspark.sql.functions.collect_list()
作为聚合函数。
from pyspark.sql.functions import collect_list
grouped_df = spark_df.groupby('category').agg(collect_list('name').alias("name"))
这会将 name
的值收集到列表中,结果输出如下所示:
grouped_df.show()
#+---------+---------+
#|category |name |
#+---------+---------+
#|A |[A1, A2] |
#|B |[B1, B2] |
#+---------+---------+
更新 2019-06-10:
如果您希望将输出作为连接字符串,则可以使用 pyspark.sql.functions.concat_ws
to concatenate the values of the collected list, which will be
from pyspark.sql.functions import concat_ws
grouped_df.withColumn("name", concat_ws(", ", "name")).show()
#+---------+-------+
#|category |name |
#+---------+-------+
#|A |A1, A2 |
#|B |B1, B2 |
#+---------+-------+
原始答案:如果您希望将输出作为连接字符串,您必须 可以 使用一个 udf
。例如,您可以先执行上述 groupBy()
,然后应用 udf
加入收集列表:
from pyspark.sql.functions import udf
concat_list = udf(lambda lst: ", ".join(lst), StringType())
grouped_df.withColumn("name", concat_list("name")).show()
#+---------+-------+
#|category |name |
#+---------+-------+
#|A |A1, A2 |
#|B |B1, B2 |
#+---------+-------+
另一个选项是这个
>>> df.rdd.reduceByKey(lambda x,y: x+','+y).toDF().show()
+---+-----+
| _1| _2|
+---+-----+
| A|A1,A2|
| B|B1,B2|
+---+-----+