使用 SparkR 将 csv 文件读入 Rstudio 时输出为空

Empty output when reading a csv file into Rstudio using SparkR

我是 SparkR 的新用户。我正在尝试使用 SparkR 将 csv 文件加载到 R 中。

Sys.setenv(SPARK_HOME="/usr/local/bin/spark-1.5.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

library(SparkR)

sc <- sparkR.init(master="local", sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

我使用了 nyc flights 数据集的一个子集来进行测试。它只有 4 行和 4 列: g年月日dep_time 2013 1 1 517 2013 1 1 533 2013 1 1 542 2013 1 1 544

n5 <- read.df(sqlContext, "/users/zhiyi.zhang/Downloads/n5.csv", "com.databricks.spark.csv", header="true")
head(n5)

然后我想看数据的时候看到了这些错误:

`15/11/03 13:45:53 ERROR CsvRelation$: Exception while parsing line: 2013,1,1,517. 

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String

at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getUTF8String(rows.scala:247)
at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:49)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:247)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV.apply(CsvRelation.scala:150)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV.apply(CsvRelation.scala:130)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/11/03 13:45:53 ERROR CsvRelation$: Exception while parsing line: 2013,1,1,533. 
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getUTF8String(rows.scala:247)
at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:49)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:247)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV.apply(CsvRelation.scala:150)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV.apply(CsvRelation.scala:130)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/11/03 13:45:53 ERROR CsvRelation$: Exception while parsing line: 2013,1,1,542. 
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getUTF8String(rows.scala:247)
at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:49)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:247)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV.apply(CsvRelation.scala:150)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV.apply(CsvRelation.scala:130)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/11/03 13:45:53 ERROR CsvRelation$: Exception while parsing line: 2013,1,1,544. 
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getUTF8String(rows.scala:247)
at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:49)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:247)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV.apply(CsvRelation.scala:150)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV.apply(CsvRelation.scala:130)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/11/03 13:45:53 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 2069 bytes result sent to driver
15/11/03 13:45:53 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 20 ms on localhost (1/1)
15/11/03 13:45:53 INFO DAGScheduler: ResultStage 3 (dfToCols at NativeMethodAccessorImpl.java:-2) finished in 0.021 s
15/11/03 13:45:53 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
15/11/03 13:45:53 INFO DAGScheduler: Job 3 finished: dfToCols at NativeMethodAccessorImpl.java:-2, took 0.030738 s`

然后它带有一个空输出:

`[1] gyear    month    day      dep_time
<0 rows> (or 0-length row.names)`

谁能帮我解决这个问题?非常感谢!

预构建的 Spark 发行版仍然使用 Scala 2.10 而非 2.11 构建。因此,如果您使用这样的发行版(我认为您会这样做),您还需要一个适用于 Scala 2.10 的 spark-csv 构建,而不是适用于 Scala 2.11(如您在代码中使用的那样)。下面的代码应该可以正常工作:

 library(rJava)
 library(SparkR)
 library(nycflights13)

 df <- flights[1:4, 1:4]
 df
   year month day dep_time
 1 2013     1   1      517
 2 2013     1   1      533
 3 2013     1   1      542
 4 2013     1   1      544

 write.csv(df, file="~/scripts/temp.csv", quote=FALSE, row.names=FALSE)

 sc <- sparkR.init(sparkHome= "/usr/local/bin/spark-1.5.1-bin-hadoop2.6/", 
                   master="local",
                   sparkPackages="com.databricks:spark-csv_2.10:1.2.0")  # 2.10 here
 sqlContext <- sparkRSQL.init(sc)
 df_spark <- read.df(sqlContext, "/home/vagrant/scripts/temp.csv", "com.databricks.spark.csv", header="true")
 head(df_spark)
   year month day dep_time
 1 2013     1   1      517
 2 2013     1   1      533
 3 2013     1   1      542
 4 2013     1   1      544