如何最有效地将 Scala DataFrame 的 Row 转换为 case class?
How to convert Row of a Scala DataFrame into case class most efficiently?
一旦我进入 Spark 一些行 class,Dataframe 或 Catalyst,我想在我的代码中将它转换为一个案例 class。这可以通过匹配
来完成
someRow match {case Row(a:Long,b:String,c:Double) => myCaseClass(a,b,c)}
但是当行有大量的列时,它会变得很难看,比如说十几个双精度值,一些布尔值,甚至偶尔有空值。
我只想能够 - 抱歉 - 将 Row 转换为 myCaseClass。有没有可能,或者我已经得到了最经济的语法?
据我所知,您不能将 Row 转换为案例 class,但我有时会选择直接访问行字段,例如
map(row => myCaseClass(row.getLong(0), row.getString(1), row.getDouble(2))
我发现这更容易,特别是如果 case class 构造函数只需要行中的某些字段。
当然,您可以将 Row 对象匹配到 case class。假设您的 SchemaType 有很多字段,您希望将其中的一些字段匹配到您的案例中 class。
如果你没有空字段,你可以简单地做:
case class MyClass(a: Long, b: String, c: Int, d: String, e: String)
dataframe.map {
case Row(a: java.math.BigDecimal,
b: String,
c: Int,
_: String,
_: java.sql.Date,
e: java.sql.Date,
_: java.sql.Timestamp,
_: java.sql.Timestamp,
_: java.math.BigDecimal,
_: String) => MyClass(a = a.longValue(), b = b, c = c, d = d.toString, e = e.toString)
}
这种方法在空值的情况下会失败,并且还需要您明确定义每个字段的类型。
如果您必须处理空值,则需要通过
丢弃所有包含空值的行
dataframe.na.drop()
即使空字段不是您案例的模式匹配中使用的空字段,也会删除记录 class。
或者如果你想处理它,你可以将 Row 对象变成一个 List 然后使用选项模式:
case class MyClass(a: Long, b: String, c: Option[Int], d: String, e: String)
dataframe.map(_.toSeq.toList match {
case List(a: java.math.BigDecimal,
b: String,
c: Int,
_: String,
_: java.sql.Date,
e: java.sql.Date,
_: java.sql.Timestamp,
_: java.sql.Timestamp,
_: java.math.BigDecimal,
_: String) => MyClass(
a = a.longValue(), b = b, c = Option(c), d = d.toString, e = e.toString)
}
查看这个 github 项目 Sparkz(),它很快将引入大量库来简化 Spark 和 DataFrame API,并使它们更面向函数式编程。
DataFrame 只是 Dataset[Row] 的类型别名。这些操作也称为“非类型化转换”,与强类型 Scala/Java 数据集附带的“类型化转换”形成对比。
在spark中Dataset[Row]到Dataset[Person]的转换非常简单
val DFtoProcess = SQLContext.sql("SELECT * FROM peoples WHERE name='test'")
此时,Spark 将您的数据转换为 DataFrame = Dataset[Row],一个通用 Row 对象的集合,因为它不知道确切的类型。
// Create an Encoders for Java class (In my eg. Person is a JAVA class)
// For scala case class you can pass Person without .class reference
val personEncoder = Encoders.bean(Person.class)
val DStoProcess = DFtoProcess.as[Person](personEncoder)
现在,Spark 按照 class 人的指示转换 Dataset[Row] -> Dataset[Person]
类型特定的 Scala / Java JVM 对象。
详情请参考下面link由databricks提供
scala> import spark.implicits._
scala> val df = Seq((1, "james"), (2, "tony")).toDF("id", "name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> case class Student(id: Int, name: String)
defined class Student
scala> df.as[Student].collectAsList
res6: java.util.List[Student] = [Student(1,james), Student(2,tony)]
spark.implicits._
中的 spark
是您的 SparkSession
。如果您在 REPL 中,会话已定义为 spark
,否则您需要相应地调整名称以对应于您的 SparkSession
.
一旦我进入 Spark 一些行 class,Dataframe 或 Catalyst,我想在我的代码中将它转换为一个案例 class。这可以通过匹配
来完成someRow match {case Row(a:Long,b:String,c:Double) => myCaseClass(a,b,c)}
但是当行有大量的列时,它会变得很难看,比如说十几个双精度值,一些布尔值,甚至偶尔有空值。
我只想能够 - 抱歉 - 将 Row 转换为 myCaseClass。有没有可能,或者我已经得到了最经济的语法?
据我所知,您不能将 Row 转换为案例 class,但我有时会选择直接访问行字段,例如
map(row => myCaseClass(row.getLong(0), row.getString(1), row.getDouble(2))
我发现这更容易,特别是如果 case class 构造函数只需要行中的某些字段。
当然,您可以将 Row 对象匹配到 case class。假设您的 SchemaType 有很多字段,您希望将其中的一些字段匹配到您的案例中 class。 如果你没有空字段,你可以简单地做:
case class MyClass(a: Long, b: String, c: Int, d: String, e: String)
dataframe.map {
case Row(a: java.math.BigDecimal,
b: String,
c: Int,
_: String,
_: java.sql.Date,
e: java.sql.Date,
_: java.sql.Timestamp,
_: java.sql.Timestamp,
_: java.math.BigDecimal,
_: String) => MyClass(a = a.longValue(), b = b, c = c, d = d.toString, e = e.toString)
}
这种方法在空值的情况下会失败,并且还需要您明确定义每个字段的类型。 如果您必须处理空值,则需要通过
丢弃所有包含空值的行dataframe.na.drop()
即使空字段不是您案例的模式匹配中使用的空字段,也会删除记录 class。 或者如果你想处理它,你可以将 Row 对象变成一个 List 然后使用选项模式:
case class MyClass(a: Long, b: String, c: Option[Int], d: String, e: String)
dataframe.map(_.toSeq.toList match {
case List(a: java.math.BigDecimal,
b: String,
c: Int,
_: String,
_: java.sql.Date,
e: java.sql.Date,
_: java.sql.Timestamp,
_: java.sql.Timestamp,
_: java.math.BigDecimal,
_: String) => MyClass(
a = a.longValue(), b = b, c = Option(c), d = d.toString, e = e.toString)
}
查看这个 github 项目 Sparkz(),它很快将引入大量库来简化 Spark 和 DataFrame API,并使它们更面向函数式编程。
DataFrame 只是 Dataset[Row] 的类型别名。这些操作也称为“非类型化转换”,与强类型 Scala/Java 数据集附带的“类型化转换”形成对比。
在spark中Dataset[Row]到Dataset[Person]的转换非常简单
val DFtoProcess = SQLContext.sql("SELECT * FROM peoples WHERE name='test'")
此时,Spark 将您的数据转换为 DataFrame = Dataset[Row],一个通用 Row 对象的集合,因为它不知道确切的类型。
// Create an Encoders for Java class (In my eg. Person is a JAVA class)
// For scala case class you can pass Person without .class reference
val personEncoder = Encoders.bean(Person.class)
val DStoProcess = DFtoProcess.as[Person](personEncoder)
现在,Spark 按照 class 人的指示转换 Dataset[Row] -> Dataset[Person]
类型特定的 Scala / Java JVM 对象。
详情请参考下面link由databricks提供
scala> import spark.implicits._
scala> val df = Seq((1, "james"), (2, "tony")).toDF("id", "name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> case class Student(id: Int, name: String)
defined class Student
scala> df.as[Student].collectAsList
res6: java.util.List[Student] = [Student(1,james), Student(2,tony)]
spark.implicits._
中的 spark
是您的 SparkSession
。如果您在 REPL 中,会话已定义为 spark
,否则您需要相应地调整名称以对应于您的 SparkSession
.