如何在 Spark 中正确使用 Accumulator 以获得正确答案?
How to use Accumulator correctly in Spark to get right answer?
我是 scala 和 spark 的新手。我想在函数内部的 spark 中使用 Accumulators 来递增,因为函数被 map 函数调用。
我有一个名为vertices
的RDD,RDD的每一行都有一个元组2,ID及其属性(键,值),例如:
(1,1)
(2,1)
.
.
.
(34,1)
我想使用累加器在键 % 2 等于零时递增。如果它等于零,累加器将增加 1,我们将有一个 tuple2,其键等于 ID,值将是累加器,如果键 %2 不等于零,我们将有一个 tuple2 与键等于 ID,值等于最后一个累加器值。
val label_counter = sc.accumulator(0,"label_counter")
def do_someThing (vertex:VertexId): (VertexId, Accumulator[Int]) = {
if (vertex % 2 == 0) {
label_counter +=1
return (vertex,label_counter)
} else return (vertex, label_counter)
}
val accum_test = vertices.map(x => (x._1)).map(do_someThing )
accum_test.foreach(println)
在这种情况下,结果是这样的:
(2,1)
(13,1)
(19,1)
(34,2)
(15,2)
(4,3)
.....
这个结果是我所期望的。键是节点 ID,值是每次映射调用中的累加器值。
但是当我使用 accum_test.collect().foreach(println)
而不是最后一行代码时,结果如下所示:
(2,17)
(13,17)
(19,17)
(34,17)
(15,17)
(4,17)
....
在这种情况下,当我使用 collect 时,tuple2 的值部分全部为 17。它是累加器采用的最后一个值。我不期待这个答案
我不知道为什么会这样,错误在哪里??我应该如何以正确的方式编写此代码??。我想我对累加器概念有一些问题。
我还有一个问题。未排序的节点,例如 2,13,19,34,15,4 和 ... 意味着这些数字分布在不同的执行者上?并分发它们使它们变得无序?因为它们是文本文件中的有序编号。
请帮我解决这些问题。谢谢
您将累加器用于不适合的用途。累加器的目的是在 RDD 上积累一些东西,并将积累的东西返回给驱动程序。该值不打算在执行程序中使用,并且未定义执行时的结果,这就是为什么您会根据看似无关的细节得到不同的结果。请记住,RDD 是分布式的,并且在各个分区上并行累积 运行。这就是为什么在执行器中访问值的结果是意外的。
为了说明我之前说的。正确的用例是:
vertices.foreach(do_something) // accumulating
println(label_counter.value) // result on the driver
我是 scala 和 spark 的新手。我想在函数内部的 spark 中使用 Accumulators 来递增,因为函数被 map 函数调用。
我有一个名为vertices
的RDD,RDD的每一行都有一个元组2,ID及其属性(键,值),例如:
(1,1)
(2,1)
.
.
.
(34,1)
我想使用累加器在键 % 2 等于零时递增。如果它等于零,累加器将增加 1,我们将有一个 tuple2,其键等于 ID,值将是累加器,如果键 %2 不等于零,我们将有一个 tuple2 与键等于 ID,值等于最后一个累加器值。
val label_counter = sc.accumulator(0,"label_counter")
def do_someThing (vertex:VertexId): (VertexId, Accumulator[Int]) = {
if (vertex % 2 == 0) {
label_counter +=1
return (vertex,label_counter)
} else return (vertex, label_counter)
}
val accum_test = vertices.map(x => (x._1)).map(do_someThing )
accum_test.foreach(println)
在这种情况下,结果是这样的:
(2,1)
(13,1)
(19,1)
(34,2)
(15,2)
(4,3)
.....
这个结果是我所期望的。键是节点 ID,值是每次映射调用中的累加器值。
但是当我使用 accum_test.collect().foreach(println)
而不是最后一行代码时,结果如下所示:
(2,17)
(13,17)
(19,17)
(34,17)
(15,17)
(4,17)
....
在这种情况下,当我使用 collect 时,tuple2 的值部分全部为 17。它是累加器采用的最后一个值。我不期待这个答案
我不知道为什么会这样,错误在哪里??我应该如何以正确的方式编写此代码??。我想我对累加器概念有一些问题。
我还有一个问题。未排序的节点,例如 2,13,19,34,15,4 和 ... 意味着这些数字分布在不同的执行者上?并分发它们使它们变得无序?因为它们是文本文件中的有序编号。
请帮我解决这些问题。谢谢
您将累加器用于不适合的用途。累加器的目的是在 RDD 上积累一些东西,并将积累的东西返回给驱动程序。该值不打算在执行程序中使用,并且未定义执行时的结果,这就是为什么您会根据看似无关的细节得到不同的结果。请记住,RDD 是分布式的,并且在各个分区上并行累积 运行。这就是为什么在执行器中访问值的结果是意外的。
为了说明我之前说的。正确的用例是:
vertices.foreach(do_something) // accumulating
println(label_counter.value) // result on the driver