为什么 spark (scala API) agg 函数接受 expr 和 exprs 参数?
Why spark (scala API) agg function takes expr and exprs arguments?
Spark API RelationalGroupedDataset
有一个函数 agg
:
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = {
toDF((expr +: exprs).map {
case typed: TypedColumn[_, _] =>
typed.withInputType(df.exprEnc, df.logicalPlan.output).expr
case c => c.expr
})
}
为什么它需要两个单独的参数?为什么不能只用 exprs: Column*
?
有人有接受一个参数的隐式函数吗?
这是为了确保您至少指定一个参数。
纯可变参数无法做到这一点,您可以在不带任何参数的情况下调用该方法。
我试着想象它会如何使用 cats.data.NonEmptyList
(需要 cats-core
依赖项:libraryDependencies += "org.typelevel" %% "cats-core" % "2.1.1"
):
import cats.data.NonEmptyList
implicit class RelationalGroupedDatasetOps(
private val rgd: RelationalGroupedDataset
) {
def aggOnNonEmpty(nonEmptyColumns: NonEmptyList[Column]): DataFrame =
rgd.agg(nonEmptyColumns.head, nonEmptyColumns.tail:_*)
def aggUnsafe(columnList: List[Column]): DataFrame = {
val nonEmptyColumns = NonEmptyList.fromListUnsafe(columnList)
rgd.agg(nonEmptyColumns.head, nonEmptyColumns.tail:_*)
}
}
对于 scala 2.12 使用标准库 List
:
implicit class RelationalGroupedDatasetOps(
private val rgd: RelationalGroupedDataset
) {
def aggUnsafe(aggColumns: List[Column]): DataFrame =
aggColumns match {
case ::(head, tail) => rgd.agg(head, tail:_*)
case Nil => throw new IllegalArgumentException(
"aggColumns parameter can not be empty for aggregation"
)
}
}
使用示例:
import Implicits.RelationalGroupedDatasetOps
// some data with columns id, category(int), amount(double)
val df: DataFrame = ???
df.groupBy("id")
.aggUnsafe(
df.columns.filter(c => c != "id").map(c => sum(c))
) // returns aggregated DataFrame
Spark API RelationalGroupedDataset
有一个函数 agg
:
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = {
toDF((expr +: exprs).map {
case typed: TypedColumn[_, _] =>
typed.withInputType(df.exprEnc, df.logicalPlan.output).expr
case c => c.expr
})
}
为什么它需要两个单独的参数?为什么不能只用 exprs: Column*
?
有人有接受一个参数的隐式函数吗?
这是为了确保您至少指定一个参数。
纯可变参数无法做到这一点,您可以在不带任何参数的情况下调用该方法。
我试着想象它会如何使用 cats.data.NonEmptyList
(需要 cats-core
依赖项:libraryDependencies += "org.typelevel" %% "cats-core" % "2.1.1"
):
import cats.data.NonEmptyList
implicit class RelationalGroupedDatasetOps(
private val rgd: RelationalGroupedDataset
) {
def aggOnNonEmpty(nonEmptyColumns: NonEmptyList[Column]): DataFrame =
rgd.agg(nonEmptyColumns.head, nonEmptyColumns.tail:_*)
def aggUnsafe(columnList: List[Column]): DataFrame = {
val nonEmptyColumns = NonEmptyList.fromListUnsafe(columnList)
rgd.agg(nonEmptyColumns.head, nonEmptyColumns.tail:_*)
}
}
对于 scala 2.12 使用标准库 List
:
implicit class RelationalGroupedDatasetOps(
private val rgd: RelationalGroupedDataset
) {
def aggUnsafe(aggColumns: List[Column]): DataFrame =
aggColumns match {
case ::(head, tail) => rgd.agg(head, tail:_*)
case Nil => throw new IllegalArgumentException(
"aggColumns parameter can not be empty for aggregation"
)
}
}
使用示例:
import Implicits.RelationalGroupedDatasetOps
// some data with columns id, category(int), amount(double)
val df: DataFrame = ???
df.groupBy("id")
.aggUnsafe(
df.columns.filter(c => c != "id").map(c => sum(c))
) // returns aggregated DataFrame