作为 Spark 作业提交时 Spark RDD 映射中的 NullPointerException

NullPointerException in Spark RDD map when submitted as a spark job

我们正在尝试提交 spark 作业(spark 2.0、hadoop 2.7.2),但出于某种原因,我们在 EMR 中收到了一个相当神秘的 NPE。一切都像 scala 程序一样运行良好,所以我们不确定是什么导致了这个问题。这是堆栈跟踪:

18:02:55,271 ERROR Utils:91 - Aborting task java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows.apply$mcV$sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows.apply(WriterContainer.scala:252) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

据我们所知,这是在以下方法中发生的:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.map(row =>
      "text|label"
  ).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}

我们已将其缩小到 map 函数,因为它在作为 spark 作业提交时有效:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}

有谁知道可能导致此问题的原因是什么?另外,我们该如何解决呢?我们很困惑。

我认为当工作人员试图访问仅存在于驱动程序而非工作人员上的 SparkContext 对象时,您会收到工作人员抛出的 NullPointerException

coalesce() 重新分区您的数据。当你只请求一个分区时,它会尝试将所有数据压缩在一个分区*中。这可能会给您的应用程序的内存足迹带来很大压力。

一般来说,最好不要将分区缩小为 1。

有关更多信息,请阅读:Spark NullPointerException with saveAsTextFile and this