如何为 Scala 集合创建编码器(以实现自定义聚合器)?

How to create an Encoder for Scala collection (to implement custom Aggregator)?

Spark 2.3.0 与 Scala 2.11。我正在根据文档 here 实施自定义 Aggregator。聚合器需要 3 种类型的输入、缓冲区和输出。

我的聚合器必须对 window 中的所有先前行进行操作,所以我这样声明:

case class Foo(...)

object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
    // other override methods
    override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}

其中一种覆盖方法应该 return 缓冲区类型的编码器,在本例中是 ListBuffer。我找不到任何适合 org.apache.spark.sql.Encoders 的编码器,也找不到任何其他编码方式,所以我不知道要在这里 return 做什么。

我想创建一个新案例 class,它有一个 属性 类型 ListBuffer[Foo] 并将其用作我的缓冲区 class,然后使用 Encoders.product,但我不确定这是否有必要,或者我是否还缺少其他东西。感谢您的任何提示。

我在 org.apache.spark.sql.Encoders 中看不到任何可用于直接编码 ListBuffer 或就此而言甚至是 List

的内容

一个选项似乎是将它放在盒子里 class,正如你所建议的:

import org.apache.spark.sql.Encoders

case class Foo(field: String)
case class Wrapper(lb: scala.collection.mutable.ListBuffer[Foo])
Encoders.product[Wrapper]

另一种选择是使用 kryo:

Encoders.kryo[scala.collection.mutable.ListBuffer[Foo]]

或者最后你可以看看 ExpressionEncoders,它扩展了编码器:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]]

这是最好的解决方案,因为它使所有内容对催化剂透明,因此允许它进行所有精彩的优化。

我玩的时候注意到一件事:

ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]].schema == ExpressionEncoder[List[Foo]].schema

我在执行聚合时没有测试过以上任何一项,因此可能存在运行时问题。希望这对您有所帮助。

你应该让 Spark SQL 完成它的工作,然后使用 ExpressionEncoder 找到合适的编码器,如下所示:

scala> spark.version
res0: String = 2.3.0

case class Mod(id: Long)

import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]