自定义聚合器中的 Spark 累加器
Spark Accumulator in custom Aggregator
我有一个自定义聚合器,在 reduce 方法中我想使用一个累加器来做一些统计。
我应该如何将累加器传递给聚合器?
我必须将累加器作为构造函数参数传递还是必须使用 AccumulatorContext.get(0)
?
在您的聚合器之外创建累加器(以及在执行节点上 运行 的任何其他代码之外)并在聚合器中使用它。变量本身可以像任何其他普通变量一样传递。
val ds = sc.parallelize(1 to 5).toDS()
val acc = sc.longAccumulator
val mySumAgg = new Aggregator[Int, Int, Int] {
def reduce(b: Int, a: Int): Int = {
acc.add(1)
a + b
}
[...]
}.toColumn
ds.groupByKey( i => i)
.agg(mySumAgg)
.show()
print("Merge has been called " + acc.value + " times")
如果您为聚合器创建了单独的 class,则可以通过构造函数传递累加器,或者您可以使用 setter。
你不应该使用 AccumulatorContext,正如文档所说:
An internal class used to track accumulators by Spark itself.
我有一个自定义聚合器,在 reduce 方法中我想使用一个累加器来做一些统计。
我应该如何将累加器传递给聚合器?
我必须将累加器作为构造函数参数传递还是必须使用 AccumulatorContext.get(0)
?
在您的聚合器之外创建累加器(以及在执行节点上 运行 的任何其他代码之外)并在聚合器中使用它。变量本身可以像任何其他普通变量一样传递。
val ds = sc.parallelize(1 to 5).toDS()
val acc = sc.longAccumulator
val mySumAgg = new Aggregator[Int, Int, Int] {
def reduce(b: Int, a: Int): Int = {
acc.add(1)
a + b
}
[...]
}.toColumn
ds.groupByKey( i => i)
.agg(mySumAgg)
.show()
print("Merge has been called " + acc.value + " times")
如果您为聚合器创建了单独的 class,则可以通过构造函数传递累加器,或者您可以使用 setter。
你不应该使用 AccumulatorContext,正如文档所说:
An internal class used to track accumulators by Spark itself.