如何定义方法来处理具有参数化类型的数据集?

How to define methods to deal with Datasets with parametrized types?

我正在尝试定义一些将 Datasets(类型为 DataFrames)作为输入并产生另一个作为输出的函数,我希望它们足够灵活以处理参数化类型。在这个例子中,我需要一个列来表示用户的 ID,但是如果该 ID 是一个 Int、一个 Long、一个 String 等,这对我的函数来说并不重要。这就是为什么我的案例 类 有这个类型参数 A.

我起初尝试简单地编写我的函数并使用 Dataset 而不是 DataFrame:

import org.apache.spark.sql.Dataset

case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)

def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
  ds.select().as[OutputT[A]] // suppose there are some transformations here
}

...但我收到此错误:

Unable to find encoder for type OutputT[A]. An implicit Encoder[OutputT[A]] is needed
to store OutputT[A] instances in a Dataset. Primitive types (Int, String, etc) and
Product types (case classes) are supported by importing spark.implicits._  Support
for serializing other types will be added in future releases.

所以我尝试为我的类型提供编码器:

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)

implicit def enc[A]: Encoder[InputT[A]] = implicitly(ExpressionEncoder[OutputT[A]])

def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
  ds.select().as[OutputT[A]] // suppose there are some transformations here
}

现在我得到这个错误:

No TypeTag available for OutputT[A]

如果代码与上面相同,但没有类型参数(例如,使用 String 而不是 A),则没有错误。

尽可能避免使用 import spark.implicits._ 魔法,我应该怎么做才能解决这个问题? Dataset 甚至有可能达到这种灵活性吗?

如果您检查 Scaladoc,您会看到 as 需要 Encoder,因此您只需将其添加到范围。

def someFunction[A](ds: Dataset[InputT[A]])(implicit ev: Encoder[[OutputT[A]]): Dataset[OutputT[A]] = {
  ds.select().as[OutputT[A]]
}

此外,您可能想看看 Where does Scala look for implicits