没有找到对应于 Product with Serializable with Base 的 Java class

No Java class corresponding to Product with Serializable with Base found

我写了 two case class 扩展了 Base abstract class。我有两个 class 列表(listAlistB)。当我想合并这两个列表时,我无法将最终列表转换为 Apache Spark 1.6.1 数据集。

abstract class Base

case class A(name: String) extends Base
case class B(age: Int) extends Base

val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB

val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()

Apache Spark 将引发此异常:

A needed class was not found. This could be due to an error in your runpath. Missing class: no Java class corresponding to Base with Product with Serializable found
java.lang.NoClassDefFoundError: no Java class corresponding to Base with Product with Serializable found
    at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1299)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)

当我想从 list 创建 RDD 时,Spark 不会抛出任何异常,但是当我使用 toDS() 方法将 RDD 转换为数据集时,这个先前的异常将抛出。

首先,您可以通过显式将 list 设为 List[Base] 或添加 Base extends Product with Serializable 来为 list 获得更合理的类型(如果其目的只是为了根据大小写 [] 进行扩展) =25=]。但这还不够,因为

Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.

请注意,不支持像 Base 这样的抽象 类。也不支持自定义编码器。尽管您可以尝试使用 kryo(或 javaSerialization,作为最后的手段)编码器,请参阅

这是完整的工作示例:

abstract class Base extends Serializable with Product

case class A(name: String) extends Base

case class B(age: Int) extends Base

object BaseEncoder {
  implicit def baseEncoder: org.apache.spark.Encoder[Base] = org.apache.spark.Encoders.kryo[Base]
}


val listA: Seq[A] = Seq(A("a"), A("b"))
val listB: Seq[B] = Seq(B(1), B(2))
val list: Seq[Base] = listA ++ listB

val ds = sc.parallelize(list).toDS