在多个列上应用自定义 Spark 聚合器 (Spark 2.0)
Apply a custom Spark Aggregator on multiple columns (Spark 2.0)
我为字符串创建了自定义 Aggregator[]
。
我想将它应用于 DataFrame
的所有列,其中所有列都是字符串,但列号是任意的。
我一直在写正确的表达方式。我想写这样的东西:
df.agg( df.columns.map( c => myagg(df(c)) ) : _*)
考虑到各种接口,这显然是错误的。
我查看了 RelationalGroupedDataset.agg(expr: Column, exprs: Column*)
代码,但我不熟悉表达式操作。
有什么想法吗?
与对单个字段(列)进行操作的 UserDefinedAggregateFunctions
相比,Aggregtors
需要完整的 Row
/ 值。
如果你想要 Aggregator
可以在你的代码段中使用,它必须由列名参数化并使用 Row
作为值类型。
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, Row}
case class Max(col: String)
extends Aggregator[Row, Int, Int] with Serializable {
def zero = Int.MinValue
def reduce(acc: Int, x: Row) =
Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero))
def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2)
def finish(acc: Int) = acc
def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
用法示例:
val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z")
@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)"))
df.agg(exprs.head, exprs.tail: _*)
+------+------+------+
|max(x)|max(y)|max(z)|
+------+------+------+
| 4| 5| 3|
+------+------+------+
可以说 Aggregators
与静态类型 Datasets
结合使用比 Dataset<Row>
.
更有意义
根据您的要求,您还可以使用 Seq[_]
累加器一次聚合多个列,并在单个 merge
调用中处理整个 Row
(记录)。
我为字符串创建了自定义 Aggregator[]
。
我想将它应用于 DataFrame
的所有列,其中所有列都是字符串,但列号是任意的。
我一直在写正确的表达方式。我想写这样的东西:
df.agg( df.columns.map( c => myagg(df(c)) ) : _*)
考虑到各种接口,这显然是错误的。
我查看了 RelationalGroupedDataset.agg(expr: Column, exprs: Column*)
代码,但我不熟悉表达式操作。
有什么想法吗?
与对单个字段(列)进行操作的 UserDefinedAggregateFunctions
相比,Aggregtors
需要完整的 Row
/ 值。
如果你想要 Aggregator
可以在你的代码段中使用,它必须由列名参数化并使用 Row
作为值类型。
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, Row}
case class Max(col: String)
extends Aggregator[Row, Int, Int] with Serializable {
def zero = Int.MinValue
def reduce(acc: Int, x: Row) =
Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero))
def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2)
def finish(acc: Int) = acc
def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
用法示例:
val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z")
@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)"))
df.agg(exprs.head, exprs.tail: _*)
+------+------+------+
|max(x)|max(y)|max(z)|
+------+------+------+
| 4| 5| 3|
+------+------+------+
可以说 Aggregators
与静态类型 Datasets
结合使用比 Dataset<Row>
.
根据您的要求,您还可以使用 Seq[_]
累加器一次聚合多个列,并在单个 merge
调用中处理整个 Row
(记录)。