如何在 Scala 和 Spark 中使用 countDistinct?
How to use countDistinct in Scala with Spark?
我已经尝试使用 countDistinct 函数,根据 DataBrick's blog,它应该在 Spark 1.5 中可用。但是,我得到以下异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function countDistinct;
我发现在 Spark developers' mail list 他们建议使用 count 和 distinct 函数来获得相同的结果由 countDistinct:
生成
count(distinct <columnName>)
// Instead
countDistinct(<columnName>)
因为我从聚合函数的名称列表中动态构建聚合表达式,所以我不希望有任何需要不同处理的特殊情况。
所以,是否可以通过以下方式统一它:
- 正在注册新的 UDAF,它将成为 count(distinct columnName)
的别名
手动注册已在 Spark CountDistinct 函数中实现,这可能是以下导入的一个:
导入org.apache.spark.sql.catalyst.expressions.{CountDistinctFunction, CountDistinct}
或以任何其他方式进行?
编辑:
示例(删除了一些本地引用和不必要的代码):
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, SQLContext, DataFrame}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
class Flattener(sc: SparkContext) {
val sqlContext = new SQLContext(sc)
def flatTable(data: DataFrame, groupField: String): DataFrame = {
val flatteningExpressions = data.columns.zip(TypeRecognizer.getTypes(data)).
flatMap(x => getFlatteningExpressions(x._1, x._2)).toList
data.groupBy(groupField).agg (
expr(s"count($groupField) as groupSize"),
flatteningExpressions:_*
)
}
private def getFlatteningExpressions(fieldName: String, fieldType: DType): List[Column] = {
val aggFuncs = getAggregationFunctons(fieldType)
aggFuncs.map(f => expr(s"$f($fieldName) as ${fieldName}_$f"))
}
private def getAggregationFunctons(fieldType: DType): List[String] = {
val aggFuncs = new ListBuffer[String]()
if(fieldType == DType.NUMERIC) {
aggFuncs += ("avg", "min", "max")
}
if(fieldType == DType.CATEGORY) {
aggFuncs += "countDistinct"
}
aggFuncs.toList
}
}
不确定我是否真的理解你的问题,但这是 countDistinct 聚合函数的示例:
val values = Array((1, 2), (1, 3), (2, 2), (1, 2))
val myDf = sc.parallelize(values).toDF("id", "foo")
import org.apache.spark.sql.functions.countDistinct
myDf.groupBy('id).agg(countDistinct('foo) as 'distinctFoo) show
/**
+---+-------------------+
| id|COUNT(DISTINCT foo)|
+---+-------------------+
| 1| 2|
| 2| 1|
+---+-------------------+
*/
countDistinct 可以以两种不同的形式使用:
df.groupBy("A").agg(expr("count(distinct B)")
或
df.groupBy("A").agg(countDistinct("B"))
但是,当您想在自定义 UDAF 的同一列上使用这些方法时,这两种方法都不起作用(在 Spark 1.5 中作为 UserDefinedAggregateFunction 实现):
// Assume that we have already implemented and registered StdDev UDAF
df.groupBy("A").agg(countDistinct("B"), expr("StdDev(B)"))
// Will cause
Exception in thread "main" org.apache.spark.sql.AnalysisException: StdDev is implemented based on the new Aggregate Function interface and it cannot be used with functions implemented based on the old Aggregate Function interface.;
由于这些限制,看起来最合理的做法是将 countDistinct 实现为 UDAF,它应该允许以相同的方式处理所有函数以及将 countDistinct 与其他 UDAF 一起使用。
示例实现如下所示:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class CountDistinct extends UserDefinedAggregateFunction{
override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = (buffer.getSeq[String](0).toSet + input.getString(0)).toSeq
}
override def bufferSchema: StructType = StructType(
StructField("items", ArrayType(StringType, true)) :: Nil
)
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = (buffer1.getSeq[String](0).toSet ++ buffer2.getSeq[String](0).toSet).toSeq
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Seq[String]()
}
override def deterministic: Boolean = true
override def evaluate(buffer: Row): Any = {
buffer.getSeq[String](0).length
}
override def dataType: DataType = IntegerType
}
我已经尝试使用 countDistinct 函数,根据 DataBrick's blog,它应该在 Spark 1.5 中可用。但是,我得到以下异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function countDistinct;
我发现在 Spark developers' mail list 他们建议使用 count 和 distinct 函数来获得相同的结果由 countDistinct:
生成count(distinct <columnName>)
// Instead
countDistinct(<columnName>)
因为我从聚合函数的名称列表中动态构建聚合表达式,所以我不希望有任何需要不同处理的特殊情况。
所以,是否可以通过以下方式统一它:
- 正在注册新的 UDAF,它将成为 count(distinct columnName) 的别名
手动注册已在 Spark CountDistinct 函数中实现,这可能是以下导入的一个:
导入org.apache.spark.sql.catalyst.expressions.{CountDistinctFunction, CountDistinct}
或以任何其他方式进行?
编辑: 示例(删除了一些本地引用和不必要的代码):
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, SQLContext, DataFrame}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
class Flattener(sc: SparkContext) {
val sqlContext = new SQLContext(sc)
def flatTable(data: DataFrame, groupField: String): DataFrame = {
val flatteningExpressions = data.columns.zip(TypeRecognizer.getTypes(data)).
flatMap(x => getFlatteningExpressions(x._1, x._2)).toList
data.groupBy(groupField).agg (
expr(s"count($groupField) as groupSize"),
flatteningExpressions:_*
)
}
private def getFlatteningExpressions(fieldName: String, fieldType: DType): List[Column] = {
val aggFuncs = getAggregationFunctons(fieldType)
aggFuncs.map(f => expr(s"$f($fieldName) as ${fieldName}_$f"))
}
private def getAggregationFunctons(fieldType: DType): List[String] = {
val aggFuncs = new ListBuffer[String]()
if(fieldType == DType.NUMERIC) {
aggFuncs += ("avg", "min", "max")
}
if(fieldType == DType.CATEGORY) {
aggFuncs += "countDistinct"
}
aggFuncs.toList
}
}
不确定我是否真的理解你的问题,但这是 countDistinct 聚合函数的示例:
val values = Array((1, 2), (1, 3), (2, 2), (1, 2))
val myDf = sc.parallelize(values).toDF("id", "foo")
import org.apache.spark.sql.functions.countDistinct
myDf.groupBy('id).agg(countDistinct('foo) as 'distinctFoo) show
/**
+---+-------------------+
| id|COUNT(DISTINCT foo)|
+---+-------------------+
| 1| 2|
| 2| 1|
+---+-------------------+
*/
countDistinct 可以以两种不同的形式使用:
df.groupBy("A").agg(expr("count(distinct B)")
或
df.groupBy("A").agg(countDistinct("B"))
但是,当您想在自定义 UDAF 的同一列上使用这些方法时,这两种方法都不起作用(在 Spark 1.5 中作为 UserDefinedAggregateFunction 实现):
// Assume that we have already implemented and registered StdDev UDAF
df.groupBy("A").agg(countDistinct("B"), expr("StdDev(B)"))
// Will cause
Exception in thread "main" org.apache.spark.sql.AnalysisException: StdDev is implemented based on the new Aggregate Function interface and it cannot be used with functions implemented based on the old Aggregate Function interface.;
由于这些限制,看起来最合理的做法是将 countDistinct 实现为 UDAF,它应该允许以相同的方式处理所有函数以及将 countDistinct 与其他 UDAF 一起使用。
示例实现如下所示:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class CountDistinct extends UserDefinedAggregateFunction{
override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = (buffer.getSeq[String](0).toSet + input.getString(0)).toSeq
}
override def bufferSchema: StructType = StructType(
StructField("items", ArrayType(StringType, true)) :: Nil
)
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = (buffer1.getSeq[String](0).toSet ++ buffer2.getSeq[String](0).toSet).toSeq
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Seq[String]()
}
override def deterministic: Boolean = true
override def evaluate(buffer: Row): Any = {
buffer.getSeq[String](0).length
}
override def dataType: DataType = IntegerType
}