Spark:当我在 Range 中使用蓄能器时,蓄能器无法正常工作
Spark: Accumulators does not work properly when I use it in Range
我不明白为什么我的累加器没有被 Spark 正确更新。
object AccumulatorsExample extends App {
val acc = sc.accumulator(0L, "acc")
sc range(0, 20000, step = 25) map { _ => acc += 1 } count()
assert(acc.value == 800) // not equals
}
我的 Spark 配置:
setMaster("local[*]") // should use 8 cpu cores
我不确定 Spark 是否将累加器的计算分布在每个核心上,也许这就是问题所在。
我的问题是如何将所有 acc
值汇总到一个总和中并获得正确的累加器值 (800)?
PS
如果我限制核心数 setMaster("local[1]")
则一切正常。
这里有两个不同的问题:
您正在扩展 App
而不是实施 main
方法。有一些与此方法相关的已知问题,包括不正确的累加器行为,因此 it shouldn't be used in Spark applications。这很可能是问题的根源。
有关扩展 App
.
的其他可能问题,请参阅 SPARK-4170 示例
您正在转换中使用累加器。这意味着累加器可以递增任意次数(给定作业成功时至少递增一次)。
一般来说,你需要精确的结果,你应该只在像 foreach
和 foreachPartition
这样的动作中使用累加器,尽管你不太可能在这样的玩具应用程序中遇到任何问题。
我不明白为什么我的累加器没有被 Spark 正确更新。
object AccumulatorsExample extends App {
val acc = sc.accumulator(0L, "acc")
sc range(0, 20000, step = 25) map { _ => acc += 1 } count()
assert(acc.value == 800) // not equals
}
我的 Spark 配置:
setMaster("local[*]") // should use 8 cpu cores
我不确定 Spark 是否将累加器的计算分布在每个核心上,也许这就是问题所在。
我的问题是如何将所有 acc
值汇总到一个总和中并获得正确的累加器值 (800)?
PS
如果我限制核心数 setMaster("local[1]")
则一切正常。
这里有两个不同的问题:
您正在扩展
App
而不是实施main
方法。有一些与此方法相关的已知问题,包括不正确的累加器行为,因此 it shouldn't be used in Spark applications。这很可能是问题的根源。有关扩展
App
. 的其他可能问题,请参阅 SPARK-4170 示例
您正在转换中使用累加器。这意味着累加器可以递增任意次数(给定作业成功时至少递增一次)。
一般来说,你需要精确的结果,你应该只在像
foreach
和foreachPartition
这样的动作中使用累加器,尽管你不太可能在这样的玩具应用程序中遇到任何问题。