如何在 Spark 数据集中存储嵌套的自定义对象?
How to store nested custom objects in Spark Dataset?
问题是
的后续问题
Spark 版本:3.0.1
可以实现非嵌套自定义类型:
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
class AnObj(val a: Int, val b: String)
implicit val myEncoder: Encoder[AnObj] = Encoders.kryo[AnObj]
val d = spark.createDataset(Seq(new AnObj(1, "a")))
d.printSchema
root
|-- value: binary (nullable = true)
但是,如果自定义类型 嵌套 在 product
类型中(即 case class
),则会报错:
java.lang.UnsupportedOperationException: No Encoder found for InnerObj
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
class InnerObj(val a: Int, val b: String)
case class MyObj(val i: Int, val j: InnerObj)
implicit val myEncoder: Encoder[InnerObj] = Encoders.kryo[InnerObj]
// error
val d = spark.createDataset(Seq(new MyObj(1, new InnerObj(0, "a"))))
// it gives Runtime error: java.lang.UnsupportedOperationException: No Encoder found for InnerObj
我们如何创建具有嵌套自定义类型的 Dataset
?
为 MyObj 和 InnerObj 添加编码器应该可以正常工作。
class InnerObj(val a:Int, val b: String)
case class MyObj(val i: Int, j: InnerObj)
implicit val myEncoder: Encoder[InnerObj] = Encoders.kryo[InnerObj]
implicit val objEncoder: Encoder[MyObj] = Encoders.kryo[MyObj]
上面的代码片段可以编译 运行 很好
除 sujesh 之外的另一个解决方案:
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
class InnerObj(val a: Int, val b: String)
case class MyObj[T](val i: Int, val j: T)
implicit val myEncoder: Encoder[MyObj[InnerObj]] = Encoders.kryo[MyObj[InnerObj]]
// works
val d = spark.createDataset(Seq(new MyObj(1, new InnerObj(0, "a"))))
这也说明了可以从type parameter
推导出内部类型和不能推导出内部类型的情况之间的区别。
前一种情况应该做:
implicit val myEncoder: Encoder[MyObj[InnerObj]] = Encoders.kryo[MyObj[InnerObj]]
后一种情况应该做:
implicit val myEncoder1: Encoder[InnerObj] = Encoders.kryo[InnerObj]
implicit val myEncoder2: Encoder[MyObj] = Encoders.kryo[MyObj]
问题是
Spark 版本:3.0.1
可以实现非嵌套自定义类型:
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
class AnObj(val a: Int, val b: String)
implicit val myEncoder: Encoder[AnObj] = Encoders.kryo[AnObj]
val d = spark.createDataset(Seq(new AnObj(1, "a")))
d.printSchema
root
|-- value: binary (nullable = true)
但是,如果自定义类型 嵌套 在 product
类型中(即 case class
),则会报错:
java.lang.UnsupportedOperationException: No Encoder found for InnerObj
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
class InnerObj(val a: Int, val b: String)
case class MyObj(val i: Int, val j: InnerObj)
implicit val myEncoder: Encoder[InnerObj] = Encoders.kryo[InnerObj]
// error
val d = spark.createDataset(Seq(new MyObj(1, new InnerObj(0, "a"))))
// it gives Runtime error: java.lang.UnsupportedOperationException: No Encoder found for InnerObj
我们如何创建具有嵌套自定义类型的 Dataset
?
为 MyObj 和 InnerObj 添加编码器应该可以正常工作。
class InnerObj(val a:Int, val b: String)
case class MyObj(val i: Int, j: InnerObj)
implicit val myEncoder: Encoder[InnerObj] = Encoders.kryo[InnerObj]
implicit val objEncoder: Encoder[MyObj] = Encoders.kryo[MyObj]
上面的代码片段可以编译 运行 很好
除 sujesh 之外的另一个解决方案:
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
class InnerObj(val a: Int, val b: String)
case class MyObj[T](val i: Int, val j: T)
implicit val myEncoder: Encoder[MyObj[InnerObj]] = Encoders.kryo[MyObj[InnerObj]]
// works
val d = spark.createDataset(Seq(new MyObj(1, new InnerObj(0, "a"))))
这也说明了可以从type parameter
推导出内部类型和不能推导出内部类型的情况之间的区别。
前一种情况应该做:
implicit val myEncoder: Encoder[MyObj[InnerObj]] = Encoders.kryo[MyObj[InnerObj]]
后一种情况应该做:
implicit val myEncoder1: Encoder[InnerObj] = Encoders.kryo[InnerObj]
implicit val myEncoder2: Encoder[MyObj] = Encoders.kryo[MyObj]