Spark SQL 替换 MySQL 的 GROUP_CONCAT 聚合函数
Spark SQL replacement for MySQL's GROUP_CONCAT aggregate function
我有一个 table 两个字符串类型的列 (username, friend)
并且对于每个用户名,我想将其所有朋友收集在一行中,并连接为字符串。例如:('username1', 'friends1, friends2, friends3')
我知道 MySQL 用 GROUP_CONCAT
做这个。有什么办法可以用 Spark SQL?
在您继续之前:此操作是另一个 groupByKey
。虽然它有多个合法应用程序,但相对昂贵,因此请务必仅在需要时使用它。
不是很简洁或高效的解决方案,但您可以使用 Spark 1.5.0 中引入的 UserDefinedAggregateFunction
:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
用法示例:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
您还可以创建一个 Python 包装器,如 Spark: How to map Python with Scala or Java User Defined Functions?
所示
实际上,提取 RDD、groupByKey
、mkString
并重建 DataFrame 会更快。
您可以通过将 collect_list
函数 (Spark >= 1.6.0) 与 concat_ws
:
结合使用来获得类似的效果
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
使用 pyspark < 1.6 的一种方法,不幸的是它不支持 user-defined 聚合函数:
byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)
如果您想再次将其设为数据框:
sqlContext.createDataFrame(byUsername, ["username", "friends"])
从 1.6 开始,您可以使用 collect_list 然后加入创建的列表:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
join_ = F.udf(lambda x: ", ".join(x), StringType())
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))
你可以试试collect_list函数
sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A
或者你可以像这样注册一个 UDF
sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))
你可以在查询中使用这个函数
sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")
语言:Scala
Spark 版本:1.5.2
我遇到了同样的问题,也尝试使用 udfs
来解决它,但不幸的是,由于类型不一致,这在后面的代码中导致了更多问题。我能够通过首先将 DF
转换为 RDD
然后 按 分组并以所需方式处理数据然后转换RDD
回一个DF
如下:
val df = sc
.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")))
.toDF("username", "friend")
+---------+-------+
| username| friend|
+---------+-------+
|username1|friend1|
|username1|friend2|
|username2|friend1|
|username2|friend3|
+---------+-------+
val dfGRPD = df.map(Row => (Row(0), Row(1)))
.groupByKey()
.map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))}
.toDF("username", "groupOfFriends")
+---------+---------------+
| username| groupOfFriends|
+---------+---------------+
|username1|friend2,friend1|
|username2|friend3,friend1|
+---------+---------------+
这是一个可以在 PySpark 中使用的函数:
import pyspark.sql.functions as F
def group_concat(col, distinct=False, sep=','):
if distinct:
collect = F.collect_set(col.cast(StringType()))
else:
collect = F.collect_list(col.cast(StringType()))
return F.concat_ws(sep, collect)
table.groupby('username').agg(F.group_concat('friends').alias('friends'))
在SQL中:
select username, concat_ws(',', collect_list(friends)) as friends
from table
group by username
下面是实现 group_concat 功能的基于 python 的代码。
输入数据:
Cust_No,Cust_Cars
1、丰田
2、宝马
1、奥迪
2、现代
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
spark = SparkSession.builder.master('yarn').getOrCreate()
# Udf to join all list elements with "|"
def combine_cars(car_list,sep='|'):
collect = sep.join(car_list)
return collect
test_udf = udf(combine_cars,StringType())
car_list_per_customer.groupBy("Cust_No").agg(F.collect_list("Cust_Cars").alias("car_list")).select("Cust_No",test_udf("car_list").alias("Final_List")).show(20,False)
输出数据:
Cust_No、Final_List
1、丰田|奥迪
2、宝马|现代
在 Spark 2.4+ 中,这在 collect_list()
和 array_join()
的帮助下变得更简单了。
这是 PySpark 中的演示,尽管代码对于 Scala 也应该非常相似:
from pyspark.sql.functions import array_join, collect_list
friends = spark.createDataFrame(
[
('jacques', 'nicolas'),
('jacques', 'georges'),
('jacques', 'francois'),
('bob', 'amelie'),
('bob', 'zoe'),
],
schema=['username', 'friend'],
)
(
friends
.orderBy('friend', ascending=False)
.groupBy('username')
.agg(
array_join(
collect_list('friend'),
delimiter=', ',
).alias('friends')
)
.show(truncate=False)
)
输出:
+--------+--------------------------+
|username|friends |
+--------+--------------------------+
|jacques |nicolas, georges, francois|
|bob |zoe, amelie |
+--------+--------------------------+
这类似于 MySQL 的 GROUP_CONCAT()
and Redshift's LISTAGG()
。
--火花SQL决议与collect_set
SELECT id, concat_ws(', ', sort_array( collect_set(colors))) as csv_colors
FROM (
VALUES ('A', 'green'),('A','yellow'),('B', 'blue'),('B','green')
) as T (id, colors)
GROUP BY id
您还可以使用 Spark SQL 函数 collect_list,之后您需要转换为字符串并使用函数 regexp_replace 替换特殊字符。
regexp_replace(regexp_replace(regexp_replace(cast(collect_list((column)) as string), ' ', ''), ',', '|'), '[^A-Z0-9|]', '')
这是一种更简单的方法。
高阶函数 concat_ws()
和 collect_list()
可以与 [=19= 一起作为一个很好的选择]groupBy()
import pyspark.sql.functions as F
df_grp = df.groupby("agg_col").agg(F.concat_ws("#;", F.collect_list(df.time)).alias("time"), F.concat_ws("#;", F.collect_list(df.status)).alias("status"), F.concat_ws("#;", F.collect_list(df.llamaType)).alias("llamaType"))
示例输出
+-------+------------------+----------------+---------------------+
|agg_col|time |status |llamaType |
+-------+------------------+----------------+---------------------+
|1 |5-1-2020#;6-2-2020|Running#;Sitting|red llama#;blue llama|
+-------+------------------+----------------+---------------------+
我有一个 table 两个字符串类型的列 (username, friend)
并且对于每个用户名,我想将其所有朋友收集在一行中,并连接为字符串。例如:('username1', 'friends1, friends2, friends3')
我知道 MySQL 用 GROUP_CONCAT
做这个。有什么办法可以用 Spark SQL?
在您继续之前:此操作是另一个 groupByKey
。虽然它有多个合法应用程序,但相对昂贵,因此请务必仅在需要时使用它。
不是很简洁或高效的解决方案,但您可以使用 Spark 1.5.0 中引入的 UserDefinedAggregateFunction
:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
用法示例:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
您还可以创建一个 Python 包装器,如 Spark: How to map Python with Scala or Java User Defined Functions?
所示实际上,提取 RDD、groupByKey
、mkString
并重建 DataFrame 会更快。
您可以通过将 collect_list
函数 (Spark >= 1.6.0) 与 concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
使用 pyspark < 1.6 的一种方法,不幸的是它不支持 user-defined 聚合函数:
byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)
如果您想再次将其设为数据框:
sqlContext.createDataFrame(byUsername, ["username", "friends"])
从 1.6 开始,您可以使用 collect_list 然后加入创建的列表:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
join_ = F.udf(lambda x: ", ".join(x), StringType())
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))
你可以试试collect_list函数
sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A
或者你可以像这样注册一个 UDF
sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))
你可以在查询中使用这个函数
sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")
语言:Scala Spark 版本:1.5.2
我遇到了同样的问题,也尝试使用 udfs
来解决它,但不幸的是,由于类型不一致,这在后面的代码中导致了更多问题。我能够通过首先将 DF
转换为 RDD
然后 按 分组并以所需方式处理数据然后转换RDD
回一个DF
如下:
val df = sc
.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")))
.toDF("username", "friend")
+---------+-------+
| username| friend|
+---------+-------+
|username1|friend1|
|username1|friend2|
|username2|friend1|
|username2|friend3|
+---------+-------+
val dfGRPD = df.map(Row => (Row(0), Row(1)))
.groupByKey()
.map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))}
.toDF("username", "groupOfFriends")
+---------+---------------+
| username| groupOfFriends|
+---------+---------------+
|username1|friend2,friend1|
|username2|friend3,friend1|
+---------+---------------+
这是一个可以在 PySpark 中使用的函数:
import pyspark.sql.functions as F
def group_concat(col, distinct=False, sep=','):
if distinct:
collect = F.collect_set(col.cast(StringType()))
else:
collect = F.collect_list(col.cast(StringType()))
return F.concat_ws(sep, collect)
table.groupby('username').agg(F.group_concat('friends').alias('friends'))
在SQL中:
select username, concat_ws(',', collect_list(friends)) as friends
from table
group by username
下面是实现 group_concat 功能的基于 python 的代码。
输入数据:
Cust_No,Cust_Cars
1、丰田
2、宝马
1、奥迪
2、现代
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
spark = SparkSession.builder.master('yarn').getOrCreate()
# Udf to join all list elements with "|"
def combine_cars(car_list,sep='|'):
collect = sep.join(car_list)
return collect
test_udf = udf(combine_cars,StringType())
car_list_per_customer.groupBy("Cust_No").agg(F.collect_list("Cust_Cars").alias("car_list")).select("Cust_No",test_udf("car_list").alias("Final_List")).show(20,False)
输出数据: Cust_No、Final_List
1、丰田|奥迪
2、宝马|现代
在 Spark 2.4+ 中,这在 collect_list()
和 array_join()
的帮助下变得更简单了。
这是 PySpark 中的演示,尽管代码对于 Scala 也应该非常相似:
from pyspark.sql.functions import array_join, collect_list
friends = spark.createDataFrame(
[
('jacques', 'nicolas'),
('jacques', 'georges'),
('jacques', 'francois'),
('bob', 'amelie'),
('bob', 'zoe'),
],
schema=['username', 'friend'],
)
(
friends
.orderBy('friend', ascending=False)
.groupBy('username')
.agg(
array_join(
collect_list('friend'),
delimiter=', ',
).alias('friends')
)
.show(truncate=False)
)
输出:
+--------+--------------------------+
|username|friends |
+--------+--------------------------+
|jacques |nicolas, georges, francois|
|bob |zoe, amelie |
+--------+--------------------------+
这类似于 MySQL 的 GROUP_CONCAT()
and Redshift's LISTAGG()
。
--火花SQL决议与collect_set
SELECT id, concat_ws(', ', sort_array( collect_set(colors))) as csv_colors
FROM (
VALUES ('A', 'green'),('A','yellow'),('B', 'blue'),('B','green')
) as T (id, colors)
GROUP BY id
您还可以使用 Spark SQL 函数 collect_list,之后您需要转换为字符串并使用函数 regexp_replace 替换特殊字符。
regexp_replace(regexp_replace(regexp_replace(cast(collect_list((column)) as string), ' ', ''), ',', '|'), '[^A-Z0-9|]', '')
这是一种更简单的方法。
高阶函数 concat_ws()
和 collect_list()
可以与 [=19= 一起作为一个很好的选择]groupBy()
import pyspark.sql.functions as F
df_grp = df.groupby("agg_col").agg(F.concat_ws("#;", F.collect_list(df.time)).alias("time"), F.concat_ws("#;", F.collect_list(df.status)).alias("status"), F.concat_ws("#;", F.collect_list(df.llamaType)).alias("llamaType"))
示例输出
+-------+------------------+----------------+---------------------+
|agg_col|time |status |llamaType |
+-------+------------------+----------------+---------------------+
|1 |5-1-2020#;6-2-2020|Running#;Sitting|red llama#;blue llama|
+-------+------------------+----------------+---------------------+