访问 Spark 中的数组列
Access Array column in Spark
Spark DataFrame 包含 Array[Double] 类型的列。当我尝试在 map() 函数中取回它时,它会抛出 ClassCastException 异常。以下 Scala 代码生成异常。
case class Dummy( x:Array[Double] )
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3))))
val s = df.map( r => {
val arr:Array[Double] = r.getAs[Array[Double]]("x")
arr.sum
})
s.foreach(println)
例外是
java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:23)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Cam 谁能解释一下为什么它不起作用?我应该怎么做呢?
我正在使用 Spark 1.5.1 和 scala 2.10.6
谢谢
ArrayType
在 Row
中表示为 scala.collection.mutable.WrappedArray
。您可以使用例如
提取它
val arr: Seq[Double] = r.getAs[Seq[Double]]("x")
或
val i: Int = ???
val arr = r.getSeq[Double](i)
甚至:
import scala.collection.mutable.WrappedArray
val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x")
如果 DataFrame
相对较薄,那么模式匹配可能是更好的方法:
import org.apache.spark.sql.Row
df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)}
尽管您必须记住序列的类型是未选中的。
在 Spark >= 1.6 中你也可以使用 Dataset
如下:
df.select("x").as[Seq[Double]].rdd
这种方法也可以考虑:
val tuples = Seq(("Abhishek", "Sengupta", Seq("MATH", "PHYSICS")))
val dF = tuples.toDF("firstName", "lastName", "subjects")
case class StudentInfo(fName: String, lName: String, subjects: Seq[String])
val students = dF
.collect()
.map(row => StudentInfo(row.getString(0), row.getString(1), row.getSeq(2)))
students.foreach(println)
Spark DataFrame 包含 Array[Double] 类型的列。当我尝试在 map() 函数中取回它时,它会抛出 ClassCastException 异常。以下 Scala 代码生成异常。
case class Dummy( x:Array[Double] )
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3))))
val s = df.map( r => {
val arr:Array[Double] = r.getAs[Array[Double]]("x")
arr.sum
})
s.foreach(println)
例外是
java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:23)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Cam 谁能解释一下为什么它不起作用?我应该怎么做呢? 我正在使用 Spark 1.5.1 和 scala 2.10.6
谢谢
ArrayType
在 Row
中表示为 scala.collection.mutable.WrappedArray
。您可以使用例如
val arr: Seq[Double] = r.getAs[Seq[Double]]("x")
或
val i: Int = ???
val arr = r.getSeq[Double](i)
甚至:
import scala.collection.mutable.WrappedArray
val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x")
如果 DataFrame
相对较薄,那么模式匹配可能是更好的方法:
import org.apache.spark.sql.Row
df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)}
尽管您必须记住序列的类型是未选中的。
在 Spark >= 1.6 中你也可以使用 Dataset
如下:
df.select("x").as[Seq[Double]].rdd
这种方法也可以考虑:
val tuples = Seq(("Abhishek", "Sengupta", Seq("MATH", "PHYSICS")))
val dF = tuples.toDF("firstName", "lastName", "subjects")
case class StudentInfo(fName: String, lName: String, subjects: Seq[String])
val students = dF
.collect()
.map(row => StudentInfo(row.getString(0), row.getString(1), row.getSeq(2)))
students.foreach(println)