如何最有效地将 Scala DataFrame 的 Row 转换为 case class?

How to convert Row of a Scala DataFrame into case class most efficiently?

一旦我进入 Spark 一些行 class,Dataframe 或 Catalyst,我想在我的代码中将它转换为一个案例 class。这可以通过匹配

来完成
someRow match {case Row(a:Long,b:String,c:Double) => myCaseClass(a,b,c)}

但是当行有大量的列时,它会变得很难看,比如说十几个双精度值,一些布尔值,甚至偶尔有空值。

我只想能够 - 抱歉 - 将 Row 转换为 myCaseClass。有没有可能,或者我已经得到了最经济的语法?

据我所知,您不能将 Row 转换为案例 class,但我有时会选择直接访问行字段,例如

map(row => myCaseClass(row.getLong(0), row.getString(1), row.getDouble(2))

我发现这更容易,特别是如果 case class 构造函数只需要行中的某些字段。

当然,您可以将 Row 对象匹配到 case class。假设您的 SchemaType 有很多字段,您希望将其中的一些字段匹配到您的案例中 class。 如果你没有空字段,你可以简单地做:

case class MyClass(a: Long, b: String, c: Int, d: String, e: String)

dataframe.map {
  case Row(a: java.math.BigDecimal, 
    b: String, 
    c: Int, 
    _: String,
    _: java.sql.Date, 
    e: java.sql.Date,
    _: java.sql.Timestamp, 
    _: java.sql.Timestamp, 
    _: java.math.BigDecimal, 
    _: String) => MyClass(a = a.longValue(), b = b, c = c, d = d.toString, e = e.toString)
}

这种方法在空值的情况下会失败,并且还需要您明确定义每个字段的类型。 如果您必须处理空值,则需要通过

丢弃所有包含空值的行
dataframe.na.drop()

即使空字段不是您案例的模式匹配中使用的空字段,也会删除记录 class。 或者如果你想处理它,你可以将 Row 对象变成一个 List 然后使用选项模式:

case class MyClass(a: Long, b: String, c: Option[Int], d: String, e: String)

dataframe.map(_.toSeq.toList match {
  case List(a: java.math.BigDecimal, 
    b: String, 
    c: Int, 
    _: String,
    _: java.sql.Date, 
    e: java.sql.Date,
    _: java.sql.Timestamp, 
    _: java.sql.Timestamp, 
    _: java.math.BigDecimal, 
    _: String) => MyClass(
      a = a.longValue(), b = b, c = Option(c), d = d.toString, e = e.toString)
}

查看这个 github 项目 Sparkz(),它很快将引入大量库来简化 Spark 和 DataFrame API,并使它们更面向函数式编程。

DataFrame 只是 Dataset[Row] 的类型别名。这些操作也称为“非类型化转换”,与强类型 Scala/Java 数据集附带的“类型化转换”形成对比。

在spark中Dataset[Row]到Dataset[Person]的转换非常简单

val DFtoProcess = SQLContext.sql("SELECT * FROM peoples WHERE name='test'")

此时,Spark 将您的数据转换为 DataFrame = Dataset[Row],一个通用 Row 对象的集合,因为它不知道确切的类型。

// Create an Encoders for Java class (In my eg. Person is a JAVA class)
// For scala case class you can pass Person without .class reference
val personEncoder = Encoders.bean(Person.class) 

val DStoProcess = DFtoProcess.as[Person](personEncoder)

现在,Spark 按照 class 人的指示转换 Dataset[Row] -> Dataset[Person] 类型特定的 Scala / Java JVM 对象。

详情请参考下面link由databricks提供

https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

scala> import spark.implicits._    
scala> val df = Seq((1, "james"), (2, "tony")).toDF("id", "name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> case class Student(id: Int, name: String)
defined class Student

scala> df.as[Student].collectAsList
res6: java.util.List[Student] = [Student(1,james), Student(2,tony)]

spark.implicits._ 中的 spark 是您的 SparkSession。如果您在 REPL 中,会话已定义为 spark,否则您需要相应地调整名称以对应于您的 SparkSession.