将 case class 传递给函数参数
Passing case class into function arguments
抱歉问了一个简单的问题。我想将 case class 传递给函数参数,我想在函数内部进一步使用它。到目前为止,我已经用 TypeTag
和 ClassTag
尝试过,但由于某种原因,我无法正确使用它,或者可能是我看的地方不对。
用例与此类似:
case class infoData(colA:Int,colB:String)
case class someOtherData(col1:String,col2:String,col3:Int)
def readCsv[T:???](path:String,passedCaseClass:???): Dataset[???] = {
sqlContext
.read
.option("header", "true")
.csv(path)
.as[passedCaseClass]
}
它将被称为这样的东西:
val infoDf = readCsv("/src/main/info.csv",infoData)
val otherDf = readCsv("/src/main/someOtherData.csv",someOtherData)
首先将您的函数定义更改为:
object t0 {
def readCsv[T] (path: String)(implicit spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
spark
.read
.option("header", "true")
.csv(path)
.as[T]
}
}
您不需要执行任何类型的反射来创建通用的 readCsv 函数。这里的关键是 Spark 在编译时需要编码器。所以你可以把它作为隐式参数传递,编译器会添加它。
因为 Spark SQL 可以反序列化产品类型(您的情况 类),包括默认编码器,所以很容易调用您的函数,例如:
case class infoData(colA: Int, colB: String)
case class someOtherData(col1: String, col2: String, col3: Int)
object test {
import t0._
implicit val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
readCsv[infoData]("/tmp")
}
希望对您有所帮助
有两点需要注意,
- class 名称应在
CamelCase
中,因此 InfoData
.
- 一旦您将类型绑定到
DataSet
,它就不是 DataFrame
。 DataFrame
是通用 Row
. 的 DataSet
的特殊名称
您需要确保您提供的 class 在当前范围内具有相应 Encoder
的隐式实例。
case class InfoData(colA: Int, colB: String)
Encoder
原始类型(Int
、String
等)和 case classes
的实例可以通过导入 spark.implicits._
获得
def readCsv[T](path: String)(implicit encoder: Encoder: T): Dataset[T] = {
spark
.read
.option("header", "true")
.csv(path)
.as[T]
}
或者,您可以使用上下文绑定,
def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
spark
.read
.option("header", "true")
.csv(path)
.as[T]
}
现在,您可以按如下方式使用它,
val spark = ...
import spark.implicits._
def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
spark
.read
.option("header", "true")
.csv(path)
.as[T]
}
val infoDS = readCsv[InfoData]("/src/main/info.csv")
抱歉问了一个简单的问题。我想将 case class 传递给函数参数,我想在函数内部进一步使用它。到目前为止,我已经用 TypeTag
和 ClassTag
尝试过,但由于某种原因,我无法正确使用它,或者可能是我看的地方不对。
用例与此类似:
case class infoData(colA:Int,colB:String)
case class someOtherData(col1:String,col2:String,col3:Int)
def readCsv[T:???](path:String,passedCaseClass:???): Dataset[???] = {
sqlContext
.read
.option("header", "true")
.csv(path)
.as[passedCaseClass]
}
它将被称为这样的东西:
val infoDf = readCsv("/src/main/info.csv",infoData)
val otherDf = readCsv("/src/main/someOtherData.csv",someOtherData)
首先将您的函数定义更改为:
object t0 {
def readCsv[T] (path: String)(implicit spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
spark
.read
.option("header", "true")
.csv(path)
.as[T]
}
}
您不需要执行任何类型的反射来创建通用的 readCsv 函数。这里的关键是 Spark 在编译时需要编码器。所以你可以把它作为隐式参数传递,编译器会添加它。
因为 Spark SQL 可以反序列化产品类型(您的情况 类),包括默认编码器,所以很容易调用您的函数,例如:
case class infoData(colA: Int, colB: String)
case class someOtherData(col1: String, col2: String, col3: Int)
object test {
import t0._
implicit val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
readCsv[infoData]("/tmp")
}
希望对您有所帮助
有两点需要注意,
- class 名称应在
CamelCase
中,因此InfoData
. - 一旦您将类型绑定到
DataSet
,它就不是DataFrame
。DataFrame
是通用Row
. 的
DataSet
的特殊名称
您需要确保您提供的 class 在当前范围内具有相应 Encoder
的隐式实例。
case class InfoData(colA: Int, colB: String)
Encoder
原始类型(Int
、String
等)和 case classes
的实例可以通过导入 spark.implicits._
def readCsv[T](path: String)(implicit encoder: Encoder: T): Dataset[T] = {
spark
.read
.option("header", "true")
.csv(path)
.as[T]
}
或者,您可以使用上下文绑定,
def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
spark
.read
.option("header", "true")
.csv(path)
.as[T]
}
现在,您可以按如下方式使用它,
val spark = ...
import spark.implicits._
def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
spark
.read
.option("header", "true")
.csv(path)
.as[T]
}
val infoDS = readCsv[InfoData]("/src/main/info.csv")