Scala-Spark 动态调用带参数值的groupby和agg
Scala-Spark Dynamically call groupby and agg with parameter values
我想编写一个自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合图。我不知道列名和聚合图预先。我想写一个类似于下面的函数。但是我是Scala的新手,我无法解决它。
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
}
并想像
那样称呼它
val listOfStrings = List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)
我该怎么做?
谁能帮帮我。
您的代码几乎是正确的 - 有两个问题:
你函数的return类型是DataFrame
,但是最后一行是aggregated.show()
,也就是returns Unit
.删除对 show
对 return aggregated
本身的调用,或者只是 return agg
的结果立即
DataFrame.groupBy
需要如下参数:col1: String, cols: String*
- 因此您需要传递匹配的参数:第一列,然后是其余列作为参数列表,您可以按如下方式执行此操作:df.groupBy(cols.head, cols.tail: _*)
总而言之,您的函数将是:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated
}
或者类似的较短版本:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame = {
df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}
如果您确实想要在您的函数中调用show
:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
aggregated
}
我想编写一个自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合图。我不知道列名和聚合图预先。我想写一个类似于下面的函数。但是我是Scala的新手,我无法解决它。
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
}
并想像
那样称呼它val listOfStrings = List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)
我该怎么做? 谁能帮帮我。
您的代码几乎是正确的 - 有两个问题:
你函数的return类型是
DataFrame
,但是最后一行是aggregated.show()
,也就是returnsUnit
.删除对show
对 returnaggregated
本身的调用,或者只是 returnagg
的结果立即DataFrame.groupBy
需要如下参数:col1: String, cols: String*
- 因此您需要传递匹配的参数:第一列,然后是其余列作为参数列表,您可以按如下方式执行此操作:df.groupBy(cols.head, cols.tail: _*)
总而言之,您的函数将是:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated
}
或者类似的较短版本:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame = {
df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}
如果您确实想要在您的函数中调用show
:
def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
aggregated
}