我们可以在 spark.map 函数中使用外部地图对象吗

Can we use outer map object in spark.map function

我是 Scala 和函数式编程的新手。我有以下火花代码片段:

case class SPR(symbol:String, splitOrg:Double, splitAdj:Double, timeStamp: String, unx_tt: Int)

var oldFct = 15.0
val splitMap = collection.mutable.Map[String, Double]()

val tmp = splitsData.map{ row=>
    var newFct = 1.0;
    var sym = row(0).toString;
    oldFct = splitMap.getOrElse(sym, 1.0)
    newFct = row(12).toString.toDouble * oldFct
    splitMap += (sym->newFct)
    SPR(row(0).toString, row(12).toString.toDouble, newFct, row(10).toString, row(13).toString.toInt)
}.collect()

println("MAP ===========" + splitMap.size)

根据我的观察,我可以在块内使用原始数据类型,但在 Map 对象的情况下,我总是将大小设置为 0。因此似乎没有添加键值对。

提前致谢。

阅读 Spark 文档中的 Understanding closures。最相关的部分(只需将 counter 替换为您的 splitMap):

RDD operations that modify variables outside of their scope can be a frequent source of confusion...

The primary challenge is that the behavior of the above code is undefined. In local mode with a single JVM, the above code will sum the values within the RDD and store it in counter. This is because both the RDD and the variable counter are in the same memory space on the driver node.

However, in cluster mode, what happens is more complicated, and the above may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks - each of which is operated on by an executor. Prior to execution, Spark computes the closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor. In local mode, there is only the one executors so everything shares the same closure. In other modes however, this is not the case and the executors running on seperate worker nodes each have their own copy of the closure.

What is happening here is that the variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only sees the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.