Spark SQL - 通用数据集 reader
Spark SQL - generic Dataset reader
我正在尝试创建通用 DataSet[T] reader 以避免每次 reader 调用都出现 dataframe.as[..]。
支持原始类型和大小写 类 所以我在想类似的东西:
def read[T <: Product](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
但是我遇到 'Unable to find encoder for type stored in Dataset' 错误。
是否可以做类似的事情?
第二个周期:
def read[T <: Product](sql : String) : Dataset[T] = {
import sparkSession.implicits._
innerRead(sql)
}
private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = {
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
以类型不匹配结束(foudn Encoder[Nothing],需要 Encoder[T])。
我试图只导入 newProductEncoder,但结果一样。
为了将 DataFrame
转换为 Dataset
,您需要有一个 Encoder
。您可以通过简单地为 T
:
添加上下文绑定和 Encoder
def read[T <: Product : Encoder](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
上下文绑定是以下内容的语法糖:
def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T]
这意味着您需要在隐式上下文中有一个(并且只有一个)Encoder[T]
.
实例
这是必需的,因为 as
方法本身需要此上下文绑定。
Spark 本身可以通过导入(正如您所做的那样)隐含函数,为您提供您可能需要的大部分 Encoder
s(原语、String
s 和 case class
es)为了你的 SparkSession
。但是,这些必须在调用站点的隐式范围内可用,这意味着您想要的可能更像以下内容:
def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = {
import spark.implicits._
val df: DataFrame = spark.sqlContext.read.option("query", sql).load()
df.as[T]
}
val spark: SparkSession = ??? // your SparkSession object
import spark.implicits._
val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table")
在您的第二个循环中,也许您需要将类型参数提供给您的 innerRead 调用:
innerRead[T](sql)
我正在尝试创建通用 DataSet[T] reader 以避免每次 reader 调用都出现 dataframe.as[..]。 支持原始类型和大小写 类 所以我在想类似的东西:
def read[T <: Product](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
但是我遇到 'Unable to find encoder for type stored in Dataset' 错误。 是否可以做类似的事情?
第二个周期:
def read[T <: Product](sql : String) : Dataset[T] = {
import sparkSession.implicits._
innerRead(sql)
}
private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = {
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
以类型不匹配结束(foudn Encoder[Nothing],需要 Encoder[T])。
我试图只导入 newProductEncoder,但结果一样。
为了将 DataFrame
转换为 Dataset
,您需要有一个 Encoder
。您可以通过简单地为 T
:
Encoder
def read[T <: Product : Encoder](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
上下文绑定是以下内容的语法糖:
def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T]
这意味着您需要在隐式上下文中有一个(并且只有一个)Encoder[T]
.
这是必需的,因为 as
方法本身需要此上下文绑定。
Spark 本身可以通过导入(正如您所做的那样)隐含函数,为您提供您可能需要的大部分 Encoder
s(原语、String
s 和 case class
es)为了你的 SparkSession
。但是,这些必须在调用站点的隐式范围内可用,这意味着您想要的可能更像以下内容:
def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = {
import spark.implicits._
val df: DataFrame = spark.sqlContext.read.option("query", sql).load()
df.as[T]
}
val spark: SparkSession = ??? // your SparkSession object
import spark.implicits._
val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table")
在您的第二个循环中,也许您需要将类型参数提供给您的 innerRead 调用:
innerRead[T](sql)