如何在 Spark 数据集中存储嵌套的自定义对象?

How to store nested custom objects in Spark Dataset?

问题是

的后续问题

Spark 版本:3.0.1

可以实现非嵌套自定义类型:

import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class AnObj(val a: Int, val b: String)

implicit val myEncoder: Encoder[AnObj] = Encoders.kryo[AnObj] 

val d = spark.createDataset(Seq(new AnObj(1, "a")))

d.printSchema
root
 |-- value: binary (nullable = true)

但是,如果自定义类型 嵌套 product 类型中(即 case class),则会报错:

java.lang.UnsupportedOperationException: No Encoder found for InnerObj

import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class InnerObj(val a: Int, val b: String)
case class MyObj(val i: Int, val j: InnerObj)

implicit val myEncoder: Encoder[InnerObj] = Encoders.kryo[InnerObj] 

// error
val d = spark.createDataset(Seq(new MyObj(1, new InnerObj(0, "a"))))
// it gives Runtime error: java.lang.UnsupportedOperationException: No Encoder found for InnerObj

我们如何创建具有嵌套自定义类型的 Dataset

为 MyObj 和 InnerObj 添加编码器应该可以正常工作。

  class InnerObj(val a:Int, val b: String)
  case class MyObj(val i: Int, j: InnerObj)

  implicit val myEncoder: Encoder[InnerObj] = Encoders.kryo[InnerObj]
  implicit val objEncoder: Encoder[MyObj] = Encoders.kryo[MyObj]

上面的代码片段可以编译 运行 很好

除 sujesh 之外的另一个解决方案:

import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class InnerObj(val a: Int, val b: String)
case class MyObj[T](val i: Int, val j: T)

implicit val myEncoder: Encoder[MyObj[InnerObj]] = Encoders.kryo[MyObj[InnerObj]] 

// works
val d = spark.createDataset(Seq(new MyObj(1, new InnerObj(0, "a"))))

这也说明了可以从type parameter推导出内部类型和不能推导出内部类型的情况之间的区别。

前一种情况应该做:

implicit val myEncoder: Encoder[MyObj[InnerObj]] = Encoders.kryo[MyObj[InnerObj]]

后一种情况应该做:

implicit val myEncoder1: Encoder[InnerObj] = Encoders.kryo[InnerObj]
implicit val myEncoder2: Encoder[MyObj] = Encoders.kryo[MyObj]