如何在数据集中存储自定义对象?

How to store custom objects in Dataset?

根据Introducing Spark Datasets

As we look forward to Spark 2.0, we plan some exciting improvements to Datasets, specifically: ... Custom encoders – while we currently autogenerate encoders for a wide variety of types, we’d like to open up an API for custom objects.

并尝试将自定义类型存储在 Dataset 中会导致以下错误:

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases

或:

Java.lang.UnsupportedOperationException: No Encoder found for ....

是否有任何现有的解决方法?


请注意,此问题仅作为社区 Wiki 答案的入口点存在。欢迎随时更新/改进问题和答案。

  1. 使用通用编码器。

    目前有两种通用编码器可用kryo and javaSerialization,其中后者明确描述为:

    extremely inefficient and should only be used as the last resort.

    假设以下 class

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    您可以通过添加隐式编码器来使用这些编码器:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    可以组合使用如下:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    它将对象存储为 binary 列,因此当转换为 DataFrame 时,您将获得以下架构:

    root
     |-- value: binary (nullable = true)
    

    也可以使用 kryo 特定字段的编码器对元组进行编码:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    请注意,我们在这里不依赖隐式编码器,而是显式传递编码器,因此这很可能不适用于 toDS 方法。

  2. 使用隐式转换:

    提供可编码表示和自定义表示之间的隐式转换class,例如:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

相关问题:

编码器在 Spark2.0 中的工作原理大致相同。 Kryo 仍然是推荐的 serialization 选择。

您可以使用 spark-shell

查看以下示例
scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Til now] 当前范围内没有 appropriate encoders,因此我们的 persons 没有被编码为 binary 值。但是,一旦我们提供一些使用 Kryo 序列化的 implicit 编码器,这种情况就会改变。

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.

更新

尽管自 2.2/2.3 以来情况有所好转,但此答案仍然有效且内容丰富,它增加了对 SetSeqMapDateTimestampBigDecimal。如果你坚持只使用 case classes 和通常的 Scala 类型来创建类型,那么只使用 SQLImplicits.

中的隐式应该没问题

不幸的是,几乎没有添加任何东西来帮助解决这个问题。在 Encoders.scala or SQLImplicits.scala 中搜索 @since 2.0.0 会发现主要与原始类型有关(以及一些大小写 classes 的调整)。所以,首先要说的是:目前对自定义 class 编码器 没有真正好的支持。考虑到我们目前拥有的资源,下面是一些技巧,它们可以像我们希望的那样做得很好。作为预先免责声明:这不会完美地工作,我会尽我最大的努力使所有限制明确和预先。

到底是什么问题

当你想制作一个数据集时,Spark "requires an encoder (to convert a JVM object of type T to and from the internal Spark SQL representation) that is generally created automatically through implicits from a SparkSession, or can be created explicitly by calling static methods on Encoders"(取自docs on createDataset). An encoder will take the form Encoder[T] where T is the type you are encoding. The first suggestion is to add import spark.implicits._ (which gives you these implicit encoders) and the second suggestion is to explicitly pass in the implicit encoder using this set of encoder related functions.

常规 classes 没有可用的编码器,因此

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

会给你以下隐含的相关编译时错误:

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases

但是,如果您将刚刚用于获取上述错误的任何类型包装在扩展 Product 的某些 class 中,错误会令人困惑地延迟到运行时,因此

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

编译正常,但在运行时失败

java.lang.UnsupportedOperationException: No Encoder found for MyObj

原因是 Spark 使用隐式创建的编码器实际上只在运行时创建(通过 scala refection)。在这种情况下,编译时的所有 Spark 检查都是最外层的 class extends Product(所有情况 classes 都会这样做),并且仅在运行时才意识到它仍然不知道如何处理 MyObj(如果我尝试创建 Dataset[(Int,MyObj)] 也会出现同样的问题 - Spark 会一直等到运行时在 MyObj 上呕吐)。这些是急需解决的核心问题:

  • 一些 class 扩展 Product 的 es 尽管总是在运行时崩溃并且
  • 无法为嵌套类型传递自定义编码器(我无法为 Spark 提供编码器 MyObj,这样它就知道如何编码 Wrap[MyObj](Int,MyObj)).

只需使用kryo

大家建议的解决方案是使用kryo编码器。

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

虽然这很快就会变得非常乏味。特别是如果您的代码正在处理各种数据集、连接、分组等。您最终会产生一堆额外的隐式。那么,为什么不做一个隐含的自动完成这一切呢?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

现在,似乎我几乎可以做任何我想做的事(下面的例子在自动导入 spark.implicits._spark-shell 中不起作用)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

或者差不多。问题是使用 kryo 会导致 Spark 将数据集中的每一行存储为平面二进制对象。对于mapfilterforeach这样就够了,但是对于像join这样的操作,Spark确实需要把这些分列出来。检查 d2d3 的架构,您会看到只有一个二进制列:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

元组的部分解

因此,利用 Scala 中隐式的魔力(6.26.3 Overloading Resolution 中有更多内容),我可以为自己制作一系列隐式,这些隐式将尽可能做好工作,至少对于元组而言是这样,并且会起作用与现有隐含的很好:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

然后,借助这些隐式,我可以使上面的示例正常工作,尽管需要对一些列进行重命名

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

我还没有弄清楚如何在不重命名的情况下默认获得预期的元组名称(_1_2、...)——如果其他人想玩this, this is where the name "value" gets introduced and this 是通常添加元组名称的地方。然而,关键是我现在有一个很好的结构化模式:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

因此,总而言之,此解决方法:

  • 允许我们为元组获取单独的列(所以我们可以再次加入元组,耶!)
  • 我们可以再次依赖隐式(所以不需要到处传递 kryo
  • 几乎完全向后兼容import spark.implicits._(涉及一些重命名)
  • 让我们加入kyro序列化的二进制列,更不用说那些可能有
  • 的字段了
  • 具有将一些元组列重命名为 "value" 的令人不快的副作用(如有必要,这可以通过转换 .toDF、指定新的列名称并转换回数据集 - 模式名称似乎是通过连接保留的,这是最需要它们的地方)。

一般 classes 的部分解决方案

这个不太愉快,没有好的解决办法。但是,既然我们有了上面的元组解决方案,我有预感来自另一个答案的隐式转换解决方案也会稍微不那么痛苦,因为您可以将更复杂的 classes 转换为元组。然后,在创建数据集之后,您可能会使用数据框方法重命名列。如果一切顺利,这确实是 的改进,因为我现在可以在我的 classes 的字段上执行联接。如果我只使用一个平面二进制 kryo 序列化程序,那将是不可能的。

这是一个可以做所有事情的例子:我有一个 class MyObj,它有 Intjava.util.UUID 和 [=69] 类型的字段=].第一个照顾好自己。第二,虽然我可以使用 kryo 进行序列化,但如果存储为 String 会更有用(因为 UUID 通常是我想要加入的对象)。第三个确实属于二进制列。

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

现在,我可以使用这种机制创建具有良好模式的数据集:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

模式向我展示了我可以加入的列的正确名称和前两个列。

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

如果是 Java Bean class,这会很有用

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

现在您可以简单地将 dataFrame 读取为自定义 DataFrame

dataFrame.as[MyClass]

这将创建一个自定义 class 编码器,而不是二进制编码器。

我的示例会在Java中,但我不认为适应Scala会很困难。

我使用 spark.createDataset and Encoders.bean as long as Fruit is a simple Java BeanRDD<Fruit> 转换为 Dataset<Fruit> 非常成功。

第 1 步:创建简单的 Java Bean。

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

在 DataBricks 人员增强其编码器之前,我会坚持 classes 使用原始类型和字符串作为字段。 如果您有一个带有嵌套对象的 class,请创建另一个简单的 Java Bean,并将其所有字段展平,这样您就可以使用 RDD 转换将复杂类型映射到更简单的类型。 当然,这是一些额外的工作,但我想它会对使用平面架构的性能有很大帮助。

第 2 步:从 RDD 获取数据集

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

瞧!起泡、冲洗、重复。

对于那些可能遇到我情况的人,我也把我的答案放在这里。

具体来说,

  1. 我正在从 SQLContext 读取 'Set typed data'。所以原始数据格式是DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. 然后使用 rdd.map() 将其转换为 mutable.WrappedArray 类型的 RDD。

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    结果:

    (1,Set(1))

您可以使用 UTDRegistration,然后使用 Case 类、元组等...都可以与您的用户定义类型一起正常工作!

假设您要使用自定义枚举:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

这样注册:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

然后使用它!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

假设您要使用多态记录:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... 并像这样使用它:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

您可以编写自定义 UDT,将所有内容编码为字节(我在这里使用 java 序列化,但检测 Spark 的 Kryo 上下文可能更好)。

首先定义UDTclass:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

然后注册:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

那你就可以用了!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

除了已经给出的建议之外,我最近发现的另一个选择是您可以声明您的自定义 class,包括特征 org.apache.spark.sql.catalyst.DefinedByConstructorParams.

如果 class 具有使用 ExpressionEncoder 可以理解的类型的构造函数,即原始值和标准集合,则此方法有效。当您无法将 class 声明为案例 class,但不想每次将其包含在数据集中时都使用 Kryo 对其进行编码时,它会派上用场。

例如,我想声明一个包含 Breeze 向量的案例 class。唯一能够处理这种情况的编码器通常是 Kryo。但是,如果我声明了一个扩展了 Breeze DenseVector 和 DefinedByConstructorParams 的 subclass,则 ExpressionEncoder 理解它可以被序列化为一个双精度数组。

我是这样声明的:

class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]

现在我可以使用 SerializableDenseVector 在数据集中使用(直接或作为产品的一部分)使用简单的 ExpressionEncoder 而不是 Kryo。它的工作方式与 Breeze DenseVector 类似,但序列化为 Array[Double]。

@Alec 的回答很棒!只是在 his/her 答案的这一部分添加评论:

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

@Alec 提到:

there is no way of passing in custom encoders for nested types (I have no way of feeding Spark an encoder for just MyObj such that it then knows how to encode Wrap[MyObj] or (Int,MyObj)).

似乎是这样,因为如果我为 MyObj 添加一个编码器:

implicit val myEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]

,还是失败:

java.lang.UnsupportedOperationException: No Encoder found for MyObj
- field (class: "MyObj", name: "unwrap")
- root class: "Wrap"
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor.apply(ScalaReflection.scala:643)

但请注意重要的错误信息:

root class: "Wrap"

它实际上暗示编码 MyObj 是不够的,你必须对 整个链 进行编码,包括 Wrap[T].

所以如果我这样做,就解决了问题:

implicit val myWrapperEncoder = org.apache.spark.sql.Encoders.kryo[Wrap[MyObj]]

因此,@Alec 的评论是 NOT 那是真的:

I have no way of feeding Spark an encoder for just MyObj such that it then knows how to encode Wrap[MyObj] or (Int,MyObj)

我们仍然有办法为 Spark 提供 MyObj 的编码器,这样它就知道如何编码 Wrap[MyObj] 或 (Int,MyObj)。