Spark 在集群模式下使用地图
Spark using map in cluster mode
我的 class 中有一个不可变的地图。当我 运行 我的代码处于本地模式时,没有问题,我可以到达地图中的每个键。但是,当我 运行 我的代码处于集群模式时,节点会抛出关于在映射中找不到键的错误。
我目前尝试的就是这些;
-在集群上广播不可变映射。
broadcast = sc.broadcast(my_immutable_map)
-将地图并行化为对RDD
my_map_rdd = sc.parallelize( my_immutable_map.toSeq)
当我检查日志时,我看到密钥未找到异常。
我的错误堆栈跟踪如下:
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 15.0 failed 4 times, most recent failure: Lost task 1.3 in stage 15.0 (TID 25, datanode1.big.com): java.util.NoSuchElementException: key not found: 905053199731
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at havelsan.CDRGenerator$.generate_random_target(CDRGenerator.scala:95)
at havelsan.CDRGenerator$$anonfun$main$$anonfun.apply(CDRGenerator.scala:167)
at havelsan.CDRGenerator$$anonfun$main$$anonfun.apply(CDRGenerator.scala:165)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply$mcV$sp(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1205)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1185)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
请您解释一下 spark 是如何分发映射的,以及为什么某些节点可能无法在该映射中找到某些键?顺便说一句,我的 spark 版本是 1.6.0
我错过了什么?
更新
这部分是为了在驱动程序上初始化映射。
...
var pd = sc.textFile( "hdfs://...")
my_immutable_map = pd.map( line => line.split(":") ).map{ line => (line(0), line(1).split(","))}.collectAsMap
...
broadcast = sc.broadcast(my_immutable_map)
my_map_rdd = sc.parallelize( my_immutable_map.toSeq)
这是我出错的部分。
def my_func(key:String):String={
...
my_value = broadcast.value(key)
...
}
my_func 在地图内部被称为;
my_another_rdd.map{ line =>
val key = line.split(",")(0)
my_func(key)
}
我找到的解决方案是将广播值作为参数传递给函数。尽管如此,我还是找不到并行化方法的解决方案。
我的 class 中有一个不可变的地图。当我 运行 我的代码处于本地模式时,没有问题,我可以到达地图中的每个键。但是,当我 运行 我的代码处于集群模式时,节点会抛出关于在映射中找不到键的错误。
我目前尝试的就是这些;
-在集群上广播不可变映射。
broadcast = sc.broadcast(my_immutable_map)
-将地图并行化为对RDD
my_map_rdd = sc.parallelize( my_immutable_map.toSeq)
当我检查日志时,我看到密钥未找到异常。 我的错误堆栈跟踪如下:
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 15.0 failed 4 times, most recent failure: Lost task 1.3 in stage 15.0 (TID 25, datanode1.big.com): java.util.NoSuchElementException: key not found: 905053199731
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at havelsan.CDRGenerator$.generate_random_target(CDRGenerator.scala:95)
at havelsan.CDRGenerator$$anonfun$main$$anonfun.apply(CDRGenerator.scala:167)
at havelsan.CDRGenerator$$anonfun$main$$anonfun.apply(CDRGenerator.scala:165)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply$mcV$sp(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1205)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1185)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
请您解释一下 spark 是如何分发映射的,以及为什么某些节点可能无法在该映射中找到某些键?顺便说一句,我的 spark 版本是 1.6.0
我错过了什么?
更新
这部分是为了在驱动程序上初始化映射。
...
var pd = sc.textFile( "hdfs://...")
my_immutable_map = pd.map( line => line.split(":") ).map{ line => (line(0), line(1).split(","))}.collectAsMap
...
broadcast = sc.broadcast(my_immutable_map)
my_map_rdd = sc.parallelize( my_immutable_map.toSeq)
这是我出错的部分。
def my_func(key:String):String={
...
my_value = broadcast.value(key)
...
}
my_func 在地图内部被称为;
my_another_rdd.map{ line =>
val key = line.split(",")(0)
my_func(key)
}
我找到的解决方案是将广播值作为参数传递给函数。尽管如此,我还是找不到并行化方法的解决方案。