如何将 Spark 的累加器传递给函数?

How can I pass Spark's accumulator to a function?

我想做这样的事情。

val ac = sc.accumulator(0)
....
a = a.map(x => someFunction(x, the_accumulator_object))
....

上面代码中 the_accumulator_ojbect 的位置应该是什么?写成 ac 就可以了吗?

另外,在函数中

def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType =
{
    .....
}

上面函数中TypeOfAccumulator的位置应该是什么?

可以找到有关 Spark 蓄电池的其他信息here

根据关于创建累加器的 scala 文档:

/** * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display * in the Spark UI. Tasks can "add" values to the accumulator using the += method. Only the * driver can access the accumulator's value. */

默认的累加器类型是int。虽然您可以设置自己的类型,但需要正确实现 += 方法以将值添加到您自己的累加器类型:

val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator")

您的主要代码片段如下:

val ac = sc.accumulator(0, "some accumulator")
....
a = a.map(x => someFunction(x, ac))
....
System.out.println("My accumulator value is: " + ac.value)

someFunction 方法植入的位置如下:

def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType =
{
    ...
    ac += 1
    ...
}