如何将 Spark 的累加器传递给函数?
How can I pass Spark's accumulator to a function?
我想做这样的事情。
val ac = sc.accumulator(0)
....
a = a.map(x => someFunction(x, the_accumulator_object))
....
上面代码中 the_accumulator_ojbect
的位置应该是什么?写成 ac
就可以了吗?
另外,在函数中
def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType =
{
.....
}
上面函数中TypeOfAccumulator
的位置应该是什么?
可以找到有关 Spark 蓄电池的其他信息here
根据关于创建累加器的 scala 文档:
/** * Create an [[org.apache.spark.Accumulator]] variable of a
given type, with a name for display * in the Spark UI. Tasks can
"add" values to the accumulator using the +=
method. Only the *
driver can access the accumulator's value
. */
默认的累加器类型是int
。虽然您可以设置自己的类型,但需要正确实现 +=
方法以将值添加到您自己的累加器类型:
val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator")
您的主要代码片段如下:
val ac = sc.accumulator(0, "some accumulator")
....
a = a.map(x => someFunction(x, ac))
....
System.out.println("My accumulator value is: " + ac.value)
someFunction
方法植入的位置如下:
def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType =
{
...
ac += 1
...
}
我想做这样的事情。
val ac = sc.accumulator(0)
....
a = a.map(x => someFunction(x, the_accumulator_object))
....
上面代码中 the_accumulator_ojbect
的位置应该是什么?写成 ac
就可以了吗?
另外,在函数中
def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType =
{
.....
}
上面函数中TypeOfAccumulator
的位置应该是什么?
可以找到有关 Spark 蓄电池的其他信息here
根据关于创建累加器的 scala 文档:
/** * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display * in the Spark UI. Tasks can "add" values to the accumulator using the
+=
method. Only the * driver can access the accumulator'svalue
. */
默认的累加器类型是int
。虽然您可以设置自己的类型,但需要正确实现 +=
方法以将值添加到您自己的累加器类型:
val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator")
您的主要代码片段如下:
val ac = sc.accumulator(0, "some accumulator")
....
a = a.map(x => someFunction(x, ac))
....
System.out.println("My accumulator value is: " + ac.value)
someFunction
方法植入的位置如下:
def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType =
{
...
ac += 1
...
}