Spark:无法将 RDD 元素添加到闭包内的可变 HashMap 中

Spark: Cannot add RDD elements into a mutable HashMap inside a closure

我有以下代码,其中 rddMaporg.apache.spark.rdd.RDD[(String, (String, String))]myHashMapscala.collection.mutable.HashMap

我做了.saveAsTextFile("temp_out")来强制评估rddMap.map

然而,即使println(" t " + t)正在打印东西,后来myHashMap仍然只有一个我手动放在开头的元素("test1", ("10", "20"))rddMap 中的所有内容都不会放入 myHashMap

片段代码:

val myHashMap = new HashMap[String, (String, String)]
myHashMap.put("test1", ("10", "20"))
rddMap.map { t =>
  println(" t " + t)
  myHashMap.put(t._1, t._2)
}.saveAsTextFile("temp_out")

println(rddMap.count)
println(myHashMap.toString)

为什么我不能将 rddMap 中的元素放到我的 myHashMap 中?

这是您想要完成的工作示例。

val rddMap = sc.parallelize(Map("A" -> ("v", "v"), "B" -> ("d","d")).toSeq)
// Collects all the data in the RDD and converts the data to a Map
val myMap = rddMap.collect().toMap
myMap.foreach(println)

输出:

(A,(v,v))  
(B,(d,d))

这是与您发布的代码类似的代码

rddMap.map { t=> 
  println("t" + t)
  newHashMap.put(t._1, t._2)
  println(newHashMap.toString) 
}.collect

这是上述代码从 Spark shell

的输出
t(A,(v,v))  
Map(A -> (v,v), test1 -> (10,20))  
t(B,(d,d))  
Map(test1 -> (10,20), B -> (d,d))

在我看来,Spark 似乎复制了您的 HashMap 并将元素添加到 copied 映射中。

今天的 Spark 并不真正支持您尝试执行的操作。

请注意,每个用户定义的函数(例如,您在 map() 中添加的函数)都是一个闭包,它被序列化并推送给每个执行者。

因此,您在 map() 中拥有的所有内容都会被序列化并四处传输:

.map{ t =>
  println(" t " + t)
  myHashMap.put(t._1, t._2)
}

基本上,您的 myHashMap 将被复制到每个刽子手,每个刽子手将更新它自己的 HashMap 版本。这就是为什么在执行结束时你的驱动程序中的 myHashMap 永远不会改变。 (驱动程序是 manages/orchestrates 您的 Spark 作业的 JVM。它是您定义 SparkContext 的地方。)

为了将驱动程序中定义的结构推送给所有执行者,您需要 broadcast 它们(参见 link here)。请注意,广播变量是只读的,因此再次说明,使用广播在这里对您没有帮助。

另一种方法是使用 Accumulators,但我觉得这些更适合汇总数值,例如求和、最大值、最小值等。也许您可以看看创建一个自定义累加器扩展 AccumulatorParam。参见 link here

回到最初的问题,如果你想收集值给你的驱动程序,目前最好的方法是转换你的 RDD,直到它们变成一个小的和可管理的元素集合,然后你 collect() 这个 final/small RDD.