如何在 Spark 中正确使用 Accumulator 以获得正确答案?

How to use Accumulator correctly in Spark to get right answer?

我是 scala 和 spark 的新手。我想在函数内部的 spark 中使用 Accumulators 来递增,因为函数被 map 函数调用。

我有一个名为vertices的RDD,RDD的每一行都有一个元组2,ID及其属性(键,值),例如:

(1,1)
(2,1)
.
.
.
(34,1)

我想使用累加器在键 % 2 等于零时递增。如果它等于零,累加器将增加 1,我们将有一个 tuple2,其键等于 ID,值将是累加器,如果键 %2 不等于零,我们将有一个 tuple2 与键等于 ID,值等于最后一个累加器值。

val label_counter = sc.accumulator(0,"label_counter")  

def do_someThing (vertex:VertexId): (VertexId, Accumulator[Int]) = {
    if (vertex % 2 == 0) {
        label_counter +=1
        return (vertex,label_counter)
    } else return (vertex, label_counter)
}

val accum_test = vertices.map(x => (x._1)).map(do_someThing )
accum_test.foreach(println)

在这种情况下,结果是这样的:

 (2,1)
 (13,1)
 (19,1)
 (34,2)
 (15,2)
 (4,3)
 ..... 

这个结果是我所期望的。键是节点 ID,值是每次映射调用中的累加器值。

但是当我使用 accum_test.collect().foreach(println) 而不是最后一行代码时,结果如下所示:

 (2,17)
 (13,17)
 (19,17)
 (34,17)
 (15,17)
 (4,17)
 ....

在这种情况下,当我使用 collect 时,tuple2 的值部分全部为 17。它是累加器采用的最后一个值。我不期待这个答案

我不知道为什么会这样,错误在哪里??我应该如何以正确的方式编写此代码??。我想我对累加器概念有一些问题。

我还有一个问题。未排序的节点,例如 2,13,19,34,15,4 和 ... 意味着这些数字分布在不同的执行者上?并分发它们使它们变得无序?因为它们是文本文件中的有序编号。

请帮我解决这些问题。谢谢

您将累加器用于不适合的用途。累加器的目的是在 RDD 上积累一些东西,并将积累的东西返回给驱动程序。该值不打算在执行程序中使用,并且未定义执行时的结果,这就是为什么您会根据看似无关的细节得到不同的结果。请记住,RDD 是分布式的,并且在各个分区上并行累积 运行。这就是为什么在执行器中访问值的结果是意外的。

为了说明我之前说的。正确的用例是:

vertices.foreach(do_something) // accumulating
println(label_counter.value) // result on the driver