是否可以在 Spark 的每个分区重置长累加器?
Is it possible to Reset long accumulator at each partition in Spark?
我正在尝试在每个分区上重置 longAccumulator
,但无法这样做。我正在尝试这种方式。
val list = (1 to 100).toList
val rdd = sc.parallelize(list,4)
val acc = sc.longAccumulator("1L")
rdd.mapPartitionsWithIndex{(i,iterator) =>
acc.reset()
acc.add(iterator.sum)
iterator
}
目前此代码不会在每个分区重置累加器。在驱动程序中,我们可以通过调用 reset()
方法将累加器重置为零。我想问一下是否可以为每个分区重置累加器。
我有n个分区。我想将每个分区的值总和存储在 List
中。对于分区 0,其总和应存储在列表的索引 0 上,依此类推。
在大多数情况下,分区中的值(或它们的总和)是无趣的并且容易发生变化。但是,仍然可以计算。
您不想使用累加器对每个分区的值求和。相反,您可以简单地计算总和并将它们 return 作为一个新的 RDD。
要按分区顺序获取总和列表,return 具有总和的索引并用它排序。然后删除它。
rdd.mapPartitionsWithIndex{(i,iterator) =>
Seq((i, iterator.reduce(_ + _))).toIterator
}.collect().sortBy(_._1).map(_._2)
这将为您提供一个数组,其中总和按顺序排列。
根据您的最终游戏,您可能想要使用自定义地图累加器。
看看this。使用方法如下:
val myAcc = new ByKeyAdditiveAccumulator[Int, Long]
sparkContext.register(myAcc)
...
rdd.foreachPartition(partition =>
acc.add((TaskContext.get.partitionId(), partition.size))
...
import scala.collection.JavaConverters._
val partitionCount = myAcc.value.asScala
我正在尝试在每个分区上重置 longAccumulator
,但无法这样做。我正在尝试这种方式。
val list = (1 to 100).toList
val rdd = sc.parallelize(list,4)
val acc = sc.longAccumulator("1L")
rdd.mapPartitionsWithIndex{(i,iterator) =>
acc.reset()
acc.add(iterator.sum)
iterator
}
目前此代码不会在每个分区重置累加器。在驱动程序中,我们可以通过调用 reset()
方法将累加器重置为零。我想问一下是否可以为每个分区重置累加器。
我有n个分区。我想将每个分区的值总和存储在 List
中。对于分区 0,其总和应存储在列表的索引 0 上,依此类推。
在大多数情况下,分区中的值(或它们的总和)是无趣的并且容易发生变化。但是,仍然可以计算。
您不想使用累加器对每个分区的值求和。相反,您可以简单地计算总和并将它们 return 作为一个新的 RDD。
要按分区顺序获取总和列表,return 具有总和的索引并用它排序。然后删除它。
rdd.mapPartitionsWithIndex{(i,iterator) =>
Seq((i, iterator.reduce(_ + _))).toIterator
}.collect().sortBy(_._1).map(_._2)
这将为您提供一个数组,其中总和按顺序排列。
根据您的最终游戏,您可能想要使用自定义地图累加器。 看看this。使用方法如下:
val myAcc = new ByKeyAdditiveAccumulator[Int, Long]
sparkContext.register(myAcc)
...
rdd.foreachPartition(partition =>
acc.add((TaskContext.get.partitionId(), partition.size))
...
import scala.collection.JavaConverters._
val partitionCount = myAcc.value.asScala