Spark 多个动态聚合函数,countDistinct 不工作
Spark multiple dynamic aggregate functions, countDistinct not working
使用多个动态聚合操作在 Spark 数据帧上进行聚合。
我想使用 Scala 和多个动态聚合操作(由用户在 JSON 中传递)对 Spark 数据帧进行聚合。我正在将 JSON 转换为 Map
.
下面是一些示例数据:
colA colB colC colD
1 2 3 4
5 6 7 8
9 10 11 12
我使用的Spark聚合代码:
var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)
我必须将 aggFuncMap
作为 Map
传递,以便用户可以通过 JSON 配置传递任意数量的聚合。
以上代码对于某些聚合工作正常,包括 sum
、min
、max
、avg
和 count
.
然而,不幸的是,此代码不适用于 countDistinct
(可能是因为它是驼峰式大小写?)。
当运行上面的代码时,我得到这个错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Undefined function: 'countdistinct'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'
任何帮助将不胜感激!
目前无法在 Map
中使用 agg
和 countDistinct
。从documentation我们看到:
The available aggregate methods are avg, max, min, sum, count.
可能的解决方法是将 Map
更改为 Seq[Column]
、
val cols = Seq("colA", "colB")
val aggFuncs = Seq(sum("colC"), countDistinct("colD"))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)
但如果用户要在配置文件中指定聚合,那将无济于事。
另一种方法是使用 expr
,此函数将评估一个字符串并返回一列。但是,expr
不会接受 "countDistinct"
,而是需要使用 "count(distinct(...))"
。
这可以编码如下:
val aggFuncs = Seq("sum(colC)", "count(distinct(colD))").map(e => expr(e))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)
使用多个动态聚合操作在 Spark 数据帧上进行聚合。
我想使用 Scala 和多个动态聚合操作(由用户在 JSON 中传递)对 Spark 数据帧进行聚合。我正在将 JSON 转换为 Map
.
下面是一些示例数据:
colA colB colC colD
1 2 3 4
5 6 7 8
9 10 11 12
我使用的Spark聚合代码:
var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)
我必须将 aggFuncMap
作为 Map
传递,以便用户可以通过 JSON 配置传递任意数量的聚合。
以上代码对于某些聚合工作正常,包括 sum
、min
、max
、avg
和 count
.
然而,不幸的是,此代码不适用于 countDistinct
(可能是因为它是驼峰式大小写?)。
当运行上面的代码时,我得到这个错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Undefined function: 'countdistinct'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'
任何帮助将不胜感激!
目前无法在 Map
中使用 agg
和 countDistinct
。从documentation我们看到:
The available aggregate methods are avg, max, min, sum, count.
可能的解决方法是将 Map
更改为 Seq[Column]
、
val cols = Seq("colA", "colB")
val aggFuncs = Seq(sum("colC"), countDistinct("colD"))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)
但如果用户要在配置文件中指定聚合,那将无济于事。
另一种方法是使用 expr
,此函数将评估一个字符串并返回一列。但是,expr
不会接受 "countDistinct"
,而是需要使用 "count(distinct(...))"
。
这可以编码如下:
val aggFuncs = Seq("sum(colC)", "count(distinct(colD))").map(e => expr(e))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)