如何使用 case class 类型参数创建数据集? (无法找到 T 型编码器)

How to create Dataset with case class Type Parameter ? (Unable to find encoder for type T)

我正在尝试从类型为 T 的 RDD 创建数据集,这被认为是一个案例 class,作为我的函数的参数传递。问题是,隐式编码器不适用于此处。我应该如何设置类型参数才能创建数据集?

我试过将 T 设置为 T: ClassTag 或使用 implicit ClassTag 但没有用。如果我使用提供类型的代码,它就可以工作,所以我想传递的特定 class 类型没有问题(基本情况 class)。

在我的用例中,我在函数中做了其他事情,但这是基本问题。

def createDatasetFromRDD[T](rdd: RDD[T])(implicit tag: ClassTag[T]): Dataset[T] = {
  // Convert RDD to Dataset
  sparkSession.createDataset(rdd)
}

我收到错误消息:

error: Unable to find encoder for type T. An implicit Encoder[T] is needed to store T instances in a Dataset.
 Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

有什么帮助或建议吗?

编辑:

已知 T 是个案 class。我知道 case classes 可以使用产品编码器,所以我基本上想让 scala 知道它可以使用这个。 Kryo 听起来不错,但没有提供产品 Encoder 的优势。

This 文章有很好的解释,理解编译器警告以及解决问题的解决方案。它涵盖了内容、原因和方式。

简而言之,这应该可以解决您的问题:

implicit def kryoEncoder[T](implicit ct: ClassTag[T]) = org.apache.spark.sql.Encoders.kryo[T](ct)

def createDatasetFromRDD[T](rdd: RDD[T]): Dataset[T] = {
    // Convert RDD to Dataset
    sparkSession.createDataset(rdd)
}

当您知道 Product Encoder 应该足够时,我搜索并找到了不使用 Kryo 的解决方案

TLDR

def createDatasetFromRDD[T <: Product : TypeTag](rdd: RDD[T]): Dataset[T] = {
  // Convert RDD to Dataset
  sparkSession.createDataset(rdd)(Encoders.product[T])
}

解释:

Kryo 有一些缺点here。相反,为什么不使用 Product 编码器,它实际上是 spark 在案例 classes 中使用的编码器?

所以如果我去 :

  sparkSession.createDataset(rdd)(Encoders.product[T])

我收到错误 type arguments [T] do not conform to method product's type parameter bounds [T <: Product]。好吧,让我们提到 Product :

def createDatasetFromRDD[T <: Product](rdd: RDD[T]): Dataset[T]

现在我得到了No TypeTag available for T。没关系,让我们放一个 TypeTag !

def createDatasetFromRDD[T <: Product : TypeTag](rdd: RDD[T]): Dataset[T]

就是这样!现在您可以为该函数提供一个 case class 类型,并且将使用产品编码器而无需任何其他代码。 如果您的 class 不适用于 [T <: Product] 那么您可能需要查看 kode 的答案。

编辑

正如 Luis Miguel Mejía Suárez 所说,另一种解决方案是提供这样的编码器:

def createDatasetFromRDD[T : Encoder](rdd: RDD[T]): Dataset[T]

并且调用者负责在隐式作用域上安装编码器,如果是这种情况 class,一个简单的 import spark.implicits._ 就足够了。如果没有,那么用户就是必须提供 kryo 编码器的人。