是否可以在 Spark 的每个分区重置长累加器?

Is it possible to Reset long accumulator at each partition in Spark?

我正在尝试在每个分区上重置 longAccumulator,但无法这样做。我正在尝试这种方式。

val list = (1 to 100).toList
val rdd = sc.parallelize(list,4)
val acc = sc.longAccumulator("1L")

rdd.mapPartitionsWithIndex{(i,iterator) => 
acc.reset()
acc.add(iterator.sum)
iterator
}

目前此代码不会在每个分区重置累加器。在驱动程序中,我们可以通过调用 reset() 方法将累加器重置为零。我想问一下是否可以为每个分区重置累加器。

我有n个分区。我想将每个分区的值总和存储在 List 中。对于分区 0,其总和应存储在列表的索引 0 上,依此类推。

在大多数情况下,分区中的值(或它们的总和)是无趣的并且容易发生变化。但是,仍然可以计算。


您不想使用累加器对每个分区的值求和。相反,您可以简单地计算总和并将它们 return 作为一个新的 RDD。

要按分区顺序获取总和列表,return 具有总和的索引并用它排序。然后删除它。

rdd.mapPartitionsWithIndex{(i,iterator) => 
  Seq((i, iterator.reduce(_ + _))).toIterator
}.collect().sortBy(_._1).map(_._2)

这将为您提供一个数组,其中总和按顺序排列。

根据您的最终游戏,您可能想要使用自定义地图累加器。 看看this。使用方法如下:

val myAcc = new ByKeyAdditiveAccumulator[Int, Long]
sparkContext.register(myAcc)
...
rdd.foreachPartition(partition => 
   acc.add((TaskContext.get.partitionId(), partition.size))
...
import scala.collection.JavaConverters._
val partitionCount = myAcc.value.asScala