spark udaf 更新数组类型的元组

spark udaf update array type of tuple

我正在使用 Scala + Spark 2.0 并尝试编写一个 UDAF,它有一个元组数组作为它的内部缓冲区以及它的 return 类型: ...

def bufferSchema = new StructType().add("midResults", ArrayType(  StructType(Array(StructField("a", DoubleType),StructField("b", DoubleType))) ))

def dataType: DataType = ArrayType(  StructType(Array(StructField( "a", DoubleType),StructField("b", DoubleType))) )

这就是我更新缓冲区的方式

def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getAs[mutable.WrappedArray[(Double,Double)]](3) ++ Array((3.0,4.0))
}

但我得到以下异常:

java.lang.ArrayStoreException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

如果我有一个简单的 Double 数组,这个模式就有效。

java.lang.ArrayStoreException"thrown to indicate that an attempt has been made to store the wrong type of object into an array of objects" and this expected because a local Scala type for StructType is o.a.s.sql.Row 不是元组。换句话说,您应该使用 Seq[Row] 作为缓冲区字段并使用 Row 作为值。

备注:

  • 循环调用 ++ 可能不是最好的主意。
  • 如果您认为创建 UDAF 有点过时,因为 Spark 2.0 collect_list 支持复杂类型。
  • 可以说 AggregatorsUserDefinedAggregateFunctions 更加用户友好。