Spark SQL - 通用数据集 reader

Spark SQL - generic Dataset reader

我正在尝试创建通用 DataSet[T] reader 以避免每次 reader 调用都出现 dataframe.as[..]。 支持原始类型和大小写 类 所以我在想类似的东西:

def read[T <: Product](sql : String): Dataset[T] = {
  import sparkSession.implicits._
  val sqlContext = sparkSession.sqlContext
  val df: DataFrame = sqlContext.read.option("query", sql).load()
  df.as[T]
}

但是我遇到 'Unable to find encoder for type stored in Dataset' 错误。 是否可以做类似的事情?

第二个周期:

def read[T <: Product](sql : String) : Dataset[T] = {
  import sparkSession.implicits._
  innerRead(sql)
}

private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = {
  val sqlContext = sparkSession.sqlContext
  val df: DataFrame = sqlContext.read.option("query", sql).load()
  df.as[T]
}  

以类型不匹配结束(foudn Encoder[Nothing],需要 Encoder[T])。

我试图只导入 newProductEncoder,但结果一样。

为了将 DataFrame 转换为 Dataset,您需要有一个 Encoder。您可以通过简单地为 T:

添加上下文绑定和 Encoder
def read[T <: Product : Encoder](sql : String): Dataset[T] = {
  import sparkSession.implicits._
  val sqlContext = sparkSession.sqlContext
  val df: DataFrame = sqlContext.read.option("query", sql).load()
  df.as[T]
}

上下文绑定是以下内容的语法糖:

def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T]

这意味着您需要在隐式上下文中有一个(并且只有一个)Encoder[T].

实例

这是必需的,因为 as 方法本身需要此上下文绑定。

Spark 本身可以通过导入(正如您所做的那样)隐含函数,为您提供您可能需要的大部分 Encoders(原语、Strings 和 case classes)为了你的 SparkSession。但是,这些必须在调用站点的隐式范围内可用,这意味着您想要的可能更像以下内容:

def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = {
  import spark.implicits._
  val df: DataFrame = spark.sqlContext.read.option("query", sql).load()
  df.as[T]
}

val spark: SparkSession = ??? // your SparkSession object
import spark.implicits._
val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table")

在您的第二个循环中,也许您需要将类型参数提供给您的 innerRead 调用:

innerRead[T](sql)