Spark 抛出 java.util.NoSuchElementException:找不到密钥:67
Spark throws java.util.NoSuchElementException: key not found: 67
运行 Zeppelin 中的 Spark 二等分 kmmeans 算法
//I transform my data using the TF-IDF algorithm
val idf = new IDF(minFreq).fit(data)
val hashIDF_features = idf.transform(dbTF)
//and parse the transformed data to the clustering algorithm.
val bkm = new BisectingKMeans().setK(100).setMaxIterations(2)
val model = bkm.run(hashIDF_features)
val cluster_rdd = model.predict(hashIDF_features)
虽然我总是遇到这个错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 270.0 failed 4 times, most recent failure: Lost task 0.3 in stage 270.0 (TID 126885, IP): java.util.NoSuchElementException: key not found: 67
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$$anonfun.apply$mcDJ$sp(BisectingKMeans.scala:338)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$$anonfun.apply(BisectingKMeans.scala:337)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$$anonfun.apply(BisectingKMeans.scala:337)
at scala.collection.TraversableOnce$$anonfun$minBy.apply(TraversableOnce.scala:231)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125)
at scala.collection.immutable.List.reduceLeft(List.scala:84)
at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231)
at scala.collection.AbstractTraversable.minBy(Traversable.scala:105)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments.apply(BisectingKMeans.scala:337)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments.apply(BisectingKMeans.scala:334)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:389)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.mllib.clustering.BisectingKMeans$.org$apache$spark$mllib$clustering$BisectingKMeans$$summarize(BisectingKMeans.scala:261)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$run.apply$mcVI$sp(BisectingKMeans.scala:194)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.mllib.clustering.BisectingKMeans.run(BisectingKMeans.scala:189)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:95)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:99)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:101)
我正在使用 Spark 1.6.1。
有趣的是,如果我在独立应用程序上 运行 这个算法,它不会出错,但我在 Zeppelin 中得到了这个。除此之外,输入是通过外部算法计算的,所以我认为这不是格式问题。有什么想法吗?
编辑:
我通过使用较小数量的集群再次测试了系统,并且没有发生错误。为什么算法会因大簇值而中断?
我认为这个问题是因为 closure. When you run your application locally, everything might be running in same memory/process.So make sure you are not trying to access local variable from a clousre which might be running in some other memory/process. 将有助于解决您的问题。
运行 Zeppelin 中的 Spark 二等分 kmmeans 算法
//I transform my data using the TF-IDF algorithm
val idf = new IDF(minFreq).fit(data)
val hashIDF_features = idf.transform(dbTF)
//and parse the transformed data to the clustering algorithm.
val bkm = new BisectingKMeans().setK(100).setMaxIterations(2)
val model = bkm.run(hashIDF_features)
val cluster_rdd = model.predict(hashIDF_features)
虽然我总是遇到这个错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 270.0 failed 4 times, most recent failure: Lost task 0.3 in stage 270.0 (TID 126885, IP): java.util.NoSuchElementException: key not found: 67
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$$anonfun.apply$mcDJ$sp(BisectingKMeans.scala:338)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$$anonfun.apply(BisectingKMeans.scala:337)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$$anonfun.apply(BisectingKMeans.scala:337)
at scala.collection.TraversableOnce$$anonfun$minBy.apply(TraversableOnce.scala:231)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125)
at scala.collection.immutable.List.reduceLeft(List.scala:84)
at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231)
at scala.collection.AbstractTraversable.minBy(Traversable.scala:105)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments.apply(BisectingKMeans.scala:337)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments.apply(BisectingKMeans.scala:334)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:389)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.mllib.clustering.BisectingKMeans$.org$apache$spark$mllib$clustering$BisectingKMeans$$summarize(BisectingKMeans.scala:261)
at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$run.apply$mcVI$sp(BisectingKMeans.scala:194)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.mllib.clustering.BisectingKMeans.run(BisectingKMeans.scala:189)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:95)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:99)
at $iwC$$iwC$$iwC$$iwC$$iwC$$$297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:101)
我正在使用 Spark 1.6.1。 有趣的是,如果我在独立应用程序上 运行 这个算法,它不会出错,但我在 Zeppelin 中得到了这个。除此之外,输入是通过外部算法计算的,所以我认为这不是格式问题。有什么想法吗?
编辑:
我通过使用较小数量的集群再次测试了系统,并且没有发生错误。为什么算法会因大簇值而中断?
我认为这个问题是因为 closure. When you run your application locally, everything might be running in same memory/process.So make sure you are not trying to access local variable from a clousre which might be running in some other memory/process.