作为 Spark 作业提交时 Spark RDD 映射中的 NullPointerException
NullPointerException in Spark RDD map when submitted as a spark job
我们正在尝试提交 spark 作业(spark 2.0、hadoop 2.7.2),但出于某种原因,我们在 EMR 中收到了一个相当神秘的 NPE。一切都像 scala 程序一样运行良好,所以我们不确定是什么导致了这个问题。这是堆栈跟踪:
18:02:55,271 ERROR Utils:91 - Aborting task
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:438)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows.apply$mcV$sp(WriterContainer.scala:253)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows.apply(WriterContainer.scala:252)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows.apply(WriterContainer.scala:252)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
据我们所知,这是在以下方法中发生的:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.map(row =>
"text|label"
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
我们已将其缩小到 map 函数,因为它在作为 spark 作业提交时有效:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
有谁知道可能导致此问题的原因是什么?另外,我们该如何解决呢?我们很困惑。
我认为当工作人员试图访问仅存在于驱动程序而非工作人员上的 SparkContext
对象时,您会收到工作人员抛出的 NullPointerException
。
coalesce() 重新分区您的数据。当你只请求一个分区时,它会尝试将所有数据压缩在一个分区*中。这可能会给您的应用程序的内存足迹带来很大压力。
一般来说,最好不要将分区缩小为 1。
有关更多信息,请阅读:Spark NullPointerException with saveAsTextFile and this。
- 如果您不确定分区是什么,我在 memoryOverhead issue in Spark.
中对自己进行了解释
我们正在尝试提交 spark 作业(spark 2.0、hadoop 2.7.2),但出于某种原因,我们在 EMR 中收到了一个相当神秘的 NPE。一切都像 scala 程序一样运行良好,所以我们不确定是什么导致了这个问题。这是堆栈跟踪:
18:02:55,271 ERROR Utils:91 - Aborting task java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows.apply$mcV$sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows.apply(WriterContainer.scala:252) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
据我们所知,这是在以下方法中发生的:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.map(row =>
"text|label"
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
我们已将其缩小到 map 函数,因为它在作为 spark 作业提交时有效:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
有谁知道可能导致此问题的原因是什么?另外,我们该如何解决呢?我们很困惑。
我认为当工作人员试图访问仅存在于驱动程序而非工作人员上的 SparkContext
对象时,您会收到工作人员抛出的 NullPointerException
。
coalesce() 重新分区您的数据。当你只请求一个分区时,它会尝试将所有数据压缩在一个分区*中。这可能会给您的应用程序的内存足迹带来很大压力。
一般来说,最好不要将分区缩小为 1。
有关更多信息,请阅读:Spark NullPointerException with saveAsTextFile and this。
- 如果您不确定分区是什么,我在 memoryOverhead issue in Spark. 中对自己进行了解释