Spark:当我在 Range 中使用蓄能器时,蓄能器无法正常工作

Spark: Accumulators does not work properly when I use it in Range

我不明白为什么我的累加器没有被 Spark 正确更新。

object AccumulatorsExample extends App {
  val acc = sc.accumulator(0L, "acc")
  sc range(0, 20000, step = 25) map { _ => acc += 1 } count()
  assert(acc.value == 800) // not equals
}

我的 Spark 配置:

setMaster("local[*]") // should use 8 cpu cores

我不确定 Spark 是否将累加器的计算分布在每个核心上,也许这就是问题所在。

我的问题是如何将所有 acc 值汇总到一个总和中并获得正确的累加器值 (800)?

PS

如果我限制核心数 setMaster("local[1]") 则一切正常。

这里有两个不同的问题:

  • 您正在扩展 App 而不是实施 main 方法。有一些与此方法相关的已知问题,包括不正确的累加器行为,因此 it shouldn't be used in Spark applications。这很可能是问题的根源。

    有关扩展 App.

  • 的其他可能问题,请参阅 SPARK-4170 示例
  • 您正在转换中使用累加器。这意味着累加器可以递增任意次数(给定作业成功时至少递增一次)。

    一般来说,你需要精确的结果,你应该只在像 foreachforeachPartition 这样的动作中使用累加器,尽管你不太可能在这样的玩具应用程序中遇到任何问题。