如何定义方法来处理具有参数化类型的数据集?
How to define methods to deal with Datasets with parametrized types?
我正在尝试定义一些将 Dataset
s(类型为 DataFrame
s)作为输入并产生另一个作为输出的函数,我希望它们足够灵活以处理参数化类型。在这个例子中,我需要一个列来表示用户的 ID,但是如果该 ID 是一个 Int、一个 Long、一个 String 等,这对我的函数来说并不重要。这就是为什么我的案例 类 有这个类型参数 A
.
我起初尝试简单地编写我的函数并使用 Dataset
而不是 DataFrame
:
import org.apache.spark.sql.Dataset
case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)
def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
ds.select().as[OutputT[A]] // suppose there are some transformations here
}
...但我收到此错误:
Unable to find encoder for type OutputT[A]. An implicit Encoder[OutputT[A]] is needed
to store OutputT[A] instances in a Dataset. Primitive types (Int, String, etc) and
Product types (case classes) are supported by importing spark.implicits._ Support
for serializing other types will be added in future releases.
所以我尝试为我的类型提供编码器:
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)
implicit def enc[A]: Encoder[InputT[A]] = implicitly(ExpressionEncoder[OutputT[A]])
def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
ds.select().as[OutputT[A]] // suppose there are some transformations here
}
现在我得到这个错误:
No TypeTag available for OutputT[A]
如果代码与上面相同,但没有类型参数(例如,使用 String
而不是 A
),则没有错误。
尽可能避免使用 import spark.implicits._
魔法,我应该怎么做才能解决这个问题? Dataset
甚至有可能达到这种灵活性吗?
如果您检查 Scaladoc,您会看到 as
需要 Encoder
,因此您只需将其添加到范围。
def someFunction[A](ds: Dataset[InputT[A]])(implicit ev: Encoder[[OutputT[A]]): Dataset[OutputT[A]] = {
ds.select().as[OutputT[A]]
}
此外,您可能想看看 Where does Scala look for implicits。
我正在尝试定义一些将 Dataset
s(类型为 DataFrame
s)作为输入并产生另一个作为输出的函数,我希望它们足够灵活以处理参数化类型。在这个例子中,我需要一个列来表示用户的 ID,但是如果该 ID 是一个 Int、一个 Long、一个 String 等,这对我的函数来说并不重要。这就是为什么我的案例 类 有这个类型参数 A
.
我起初尝试简单地编写我的函数并使用 Dataset
而不是 DataFrame
:
import org.apache.spark.sql.Dataset
case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)
def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
ds.select().as[OutputT[A]] // suppose there are some transformations here
}
...但我收到此错误:
Unable to find encoder for type OutputT[A]. An implicit Encoder[OutputT[A]] is needed
to store OutputT[A] instances in a Dataset. Primitive types (Int, String, etc) and
Product types (case classes) are supported by importing spark.implicits._ Support
for serializing other types will be added in future releases.
所以我尝试为我的类型提供编码器:
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
case class InputT[A](id: A, data: Long)
case class OutputT[A](id: A, dataA: Long, dataB: Long)
implicit def enc[A]: Encoder[InputT[A]] = implicitly(ExpressionEncoder[OutputT[A]])
def someFunction[A](ds: Dataset[InputT[A]]): Dataset[OutputT[A]] = {
ds.select().as[OutputT[A]] // suppose there are some transformations here
}
现在我得到这个错误:
No TypeTag available for OutputT[A]
如果代码与上面相同,但没有类型参数(例如,使用 String
而不是 A
),则没有错误。
尽可能避免使用 import spark.implicits._
魔法,我应该怎么做才能解决这个问题? Dataset
甚至有可能达到这种灵活性吗?
如果您检查 Scaladoc,您会看到 as
需要 Encoder
,因此您只需将其添加到范围。
def someFunction[A](ds: Dataset[InputT[A]])(implicit ev: Encoder[[OutputT[A]]): Dataset[OutputT[A]] = {
ds.select().as[OutputT[A]]
}
此外,您可能想看看 Where does Scala look for implicits。