将 Spark RDD 作为文本文件写入 S3 存储桶

Writing Spark RDD as text file to S3 bucket

我正在尝试将 Spark RDD 作为 gzip 文本文件(或多个文本文件)保存到 S3 存储桶中。 S3 存储桶安装到 dbfs。我正在尝试使用以下方法保存文件:

rddDataset.saveAsTextFile("/mnt/mymount/myfolder/")

但是在尝试此操作时,我不断收到错误消息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 18.0 failed 4 times, most recent failure: Lost task 32.3 in stage 18.0 (TID 279, ip-10-81-194-225.ec2.internal): java.lang.NullPointerException

但是,我确实看到一些文件写入了 S3 存储桶。我也尝试过使用 rddDataset.repartition(1).saveAsTextFile("/mnt/mymount/myfolder/"),按照 here 的建议,但这以同样的错误结束。

这似乎与 this question 相似,所以这些错误可能是由于我的 RDD 中的空值造成的?但是当我尝试 val newRDD = rddDataset.map(line => line).filter(x => x!= null).filter(x => x!=" ").filter(x => x!="") 并尝试保存这个 RDD 时,我得到了同样的错误。

此外,rddDataset.count() 会引发类似的错误。我正在从一个数据框创建 rddDataset,它可以很好地显示它的所有行。但是,如果我将原始数据帧转换为 RDD,我可以重现 java.lang.NullPointerException

val testRDD = df.rdd
testRDD.count()

> org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 85.0 failed 4 times, most recent failure: Lost task 32.3 in stage 85.0 (TID 1668, ip-10-81-194-241.ec2.internal): java.lang.NullPointerException

我提供了以下堆栈跟踪之一:

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1927)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply$mcV$sp(PairRDDFunctions.scala:1209)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply(PairRDDFunctions.scala:1154)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply(PairRDDFunctions.scala:1154)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1154)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply$mcV$sp(PairRDDFunctions.scala:1060)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply$mcV$sp(PairRDDFunctions.scala:952)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:952)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:952)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:951)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply$mcV$sp(RDD.scala:1457)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply(RDD.scala:1436)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply(RDD.scala:1436)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1436)
Caused by: java.lang.NullPointerException

此外,当我在 运行 rddDataset.repartition(200).saveAsTextFile(/mnt/mymount/myfolder) 之后打开阶段的信息选项卡时,我可以找到错误详细信息:

java.lang.NullPointerException
at linef9b86491b9da46b9858e22af0cc8257227.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:48)
at linef9b86491b9da46b9858e22af0cc8257227.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:48)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr35$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$$anonfun$apply.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$$anonfun$apply.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:46)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:235)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我在预处理数据时没有正确处理我的一个 UDF 中的空值。具体来说,我不得不从

更改我的一个 UDF
val converter = (arg: String) => {
  arg.split("").mkString("_").replace(":","_") 
}

val converter = (arg: String) => {
  if (arg == null || arg== "") "" else arg.split("").mkString("_").replace(":","_") 
}

之后似乎一切正常。