如何为 Scala 集合创建编码器(以实现自定义聚合器)?
How to create an Encoder for Scala collection (to implement custom Aggregator)?
Spark 2.3.0 与 Scala 2.11。我正在根据文档 here 实施自定义 Aggregator
。聚合器需要 3 种类型的输入、缓冲区和输出。
我的聚合器必须对 window 中的所有先前行进行操作,所以我这样声明:
case class Foo(...)
object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
// other override methods
override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}
其中一种覆盖方法应该 return 缓冲区类型的编码器,在本例中是 ListBuffer
。我找不到任何适合 org.apache.spark.sql.Encoders
的编码器,也找不到任何其他编码方式,所以我不知道要在这里 return 做什么。
我想创建一个新案例 class,它有一个 属性 类型 ListBuffer[Foo]
并将其用作我的缓冲区 class,然后使用 Encoders.product
,但我不确定这是否有必要,或者我是否还缺少其他东西。感谢您的任何提示。
我在 org.apache.spark.sql.Encoders 中看不到任何可用于直接编码 ListBuffer 或就此而言甚至是 List
的内容
一个选项似乎是将它放在盒子里 class,正如你所建议的:
import org.apache.spark.sql.Encoders
case class Foo(field: String)
case class Wrapper(lb: scala.collection.mutable.ListBuffer[Foo])
Encoders.product[Wrapper]
另一种选择是使用 kryo:
Encoders.kryo[scala.collection.mutable.ListBuffer[Foo]]
或者最后你可以看看 ExpressionEncoders,它扩展了编码器:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]]
这是最好的解决方案,因为它使所有内容对催化剂透明,因此允许它进行所有精彩的优化。
我玩的时候注意到一件事:
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]].schema == ExpressionEncoder[List[Foo]].schema
我在执行聚合时没有测试过以上任何一项,因此可能存在运行时问题。希望这对您有所帮助。
你应该让 Spark SQL 完成它的工作,然后使用 ExpressionEncoder
找到合适的编码器,如下所示:
scala> spark.version
res0: String = 2.3.0
case class Mod(id: Long)
import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
Spark 2.3.0 与 Scala 2.11。我正在根据文档 here 实施自定义 Aggregator
。聚合器需要 3 种类型的输入、缓冲区和输出。
我的聚合器必须对 window 中的所有先前行进行操作,所以我这样声明:
case class Foo(...)
object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
// other override methods
override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}
其中一种覆盖方法应该 return 缓冲区类型的编码器,在本例中是 ListBuffer
。我找不到任何适合 org.apache.spark.sql.Encoders
的编码器,也找不到任何其他编码方式,所以我不知道要在这里 return 做什么。
我想创建一个新案例 class,它有一个 属性 类型 ListBuffer[Foo]
并将其用作我的缓冲区 class,然后使用 Encoders.product
,但我不确定这是否有必要,或者我是否还缺少其他东西。感谢您的任何提示。
我在 org.apache.spark.sql.Encoders 中看不到任何可用于直接编码 ListBuffer 或就此而言甚至是 List
的内容一个选项似乎是将它放在盒子里 class,正如你所建议的:
import org.apache.spark.sql.Encoders
case class Foo(field: String)
case class Wrapper(lb: scala.collection.mutable.ListBuffer[Foo])
Encoders.product[Wrapper]
另一种选择是使用 kryo:
Encoders.kryo[scala.collection.mutable.ListBuffer[Foo]]
或者最后你可以看看 ExpressionEncoders,它扩展了编码器:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]]
这是最好的解决方案,因为它使所有内容对催化剂透明,因此允许它进行所有精彩的优化。
我玩的时候注意到一件事:
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]].schema == ExpressionEncoder[List[Foo]].schema
我在执行聚合时没有测试过以上任何一项,因此可能存在运行时问题。希望这对您有所帮助。
你应该让 Spark SQL 完成它的工作,然后使用 ExpressionEncoder
找到合适的编码器,如下所示:
scala> spark.version
res0: String = 2.3.0
case class Mod(id: Long)
import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]