在 Amazon EMR 集群中 运行 时,Spark 广播变量 returns NullPointerException
Spark broadcasted variable returns NullPointerException when run in Amazon EMR cluster
我通过广播共享的变量在集群中为空。
我的应用程序非常复杂,但是我写了这个小例子,当我在本地 运行 它可以完美运行,但它在集群中失败了:
package com.gonzalopezzi.bigdata.bicing
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
object PruebaBroadcast2 extends App {
val conf = new SparkConf().setAppName("PruebaBroadcast2")
val sc = new SparkContext(conf)
val arr : Array[Int] = (6 to 9).toArray
val broadcasted = sc.broadcast(arr)
val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines
rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println) // NullPointerException in the flatmap. broadcasted is null
}
不知道是编码错误还是配置问题。
这是我得到的堆栈跟踪:
15/07/07 20:55:13 INFO scheduler.DAGScheduler: Job 0 failed: collect at PruebaBroadcast2.scala:24, took 0.992297 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-36-49.ec2.internal): java.lang.NullPointerException
at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun.apply(PruebaBroadcast2.scala:24)
at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun.apply(PruebaBroadcast2.scala:24)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
Command exiting with ret '1'
谁能帮我解决这个问题?
至少,如果您在代码中看到一些奇怪的地方,您能告诉我吗?
如果您认为代码没问题,请告诉我,因为这意味着问题出在集群的配置上。
提前致谢。
我终于让它工作了。
像这样声明对象是行不通的:
object MyObject extends App {
但是如果您声明一个具有 main 函数的对象,它就可以工作:
object MyObject {
def main (args : Array[String]) {
/* ... */
}
}
因此,如果我这样重写问题中的简短示例,它就会起作用:
object PruebaBroadcast2 {
def main (args: Array[String]) {
val conf = new SparkConf().setAppName("PruebaBroadcast2")
val sc = new SparkContext(conf)
val arr : Array[Int] = (6 to 9).toArray
val broadcasted = sc.broadcast(arr)
val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2)
rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println)
}
}
这个问题似乎与这个错误有关:
https://issues.apache.org/jira/browse/SPARK-4170
我有类似的问题。问题是我有一个变量,并在 RDD 映射函数中使用它,但我得到了空值。这是我的原始代码:
object MyClass extends App {
...
val prefix = "prefix"
val newRDD = inputRDD.map(s => prefix + s) // got null for prefix
...
}
而且我发现它适用于任何功能,而不仅仅是 main():
object MyClass extends App {
...
val prefix = "prefix"
val newRDD = addPrefix(input, prefix)
def addPrefix(input: RDD[String], prefix: String): RDD[String] = {
inputRDD.map(s => prefix + s)
}
}
我通过广播共享的变量在集群中为空。
我的应用程序非常复杂,但是我写了这个小例子,当我在本地 运行 它可以完美运行,但它在集群中失败了:
package com.gonzalopezzi.bigdata.bicing
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
object PruebaBroadcast2 extends App {
val conf = new SparkConf().setAppName("PruebaBroadcast2")
val sc = new SparkContext(conf)
val arr : Array[Int] = (6 to 9).toArray
val broadcasted = sc.broadcast(arr)
val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines
rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println) // NullPointerException in the flatmap. broadcasted is null
}
不知道是编码错误还是配置问题。
这是我得到的堆栈跟踪:
15/07/07 20:55:13 INFO scheduler.DAGScheduler: Job 0 failed: collect at PruebaBroadcast2.scala:24, took 0.992297 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-36-49.ec2.internal): java.lang.NullPointerException
at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun.apply(PruebaBroadcast2.scala:24)
at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun.apply(PruebaBroadcast2.scala:24)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
Command exiting with ret '1'
谁能帮我解决这个问题? 至少,如果您在代码中看到一些奇怪的地方,您能告诉我吗? 如果您认为代码没问题,请告诉我,因为这意味着问题出在集群的配置上。
提前致谢。
我终于让它工作了。
像这样声明对象是行不通的:
object MyObject extends App {
但是如果您声明一个具有 main 函数的对象,它就可以工作:
object MyObject {
def main (args : Array[String]) {
/* ... */
}
}
因此,如果我这样重写问题中的简短示例,它就会起作用:
object PruebaBroadcast2 {
def main (args: Array[String]) {
val conf = new SparkConf().setAppName("PruebaBroadcast2")
val sc = new SparkContext(conf)
val arr : Array[Int] = (6 to 9).toArray
val broadcasted = sc.broadcast(arr)
val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2)
rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println)
}
}
这个问题似乎与这个错误有关: https://issues.apache.org/jira/browse/SPARK-4170
我有类似的问题。问题是我有一个变量,并在 RDD 映射函数中使用它,但我得到了空值。这是我的原始代码:
object MyClass extends App {
...
val prefix = "prefix"
val newRDD = inputRDD.map(s => prefix + s) // got null for prefix
...
}
而且我发现它适用于任何功能,而不仅仅是 main():
object MyClass extends App {
...
val prefix = "prefix"
val newRDD = addPrefix(input, prefix)
def addPrefix(input: RDD[String], prefix: String): RDD[String] = {
inputRDD.map(s => prefix + s)
}
}