Spark 累加器:正确的累加器有时是多个还是总是一个?

Spark Accumulators: Is the right accumulator sometimes many or always one?

我正在尝试使用 Spark 累加器按查询删除性能不佳的组。

import org.apache.spark._
object CountPairsParam extends AccumulatorParam[Map[Int, Set[Int]]] {

  def zero(initialValue: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
    Map.empty[Int, Set[Int]]
  }

  def addInPlace(m1: Map[Int, Set[Int]], m2: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
    val keys = m1.keys ++ m2.keys
     keys.map((k: Int) => (k -> (m1.getOrElse(k, Set.empty[Int]) ++ m2.getOrElse(k, Set.empty[Int])))).toMap
  }
}
val accum = sc.accumulator(Map.empty[Int, Set[Int]])(CountPairsParam)
srch_destination_id_distinct.foreach(r => try{accum += Map(r(0).toString.toInt -> Set(r(1).toString.toInt))} catch {case ioe: NumberFormatException =>  Map.empty[Int, Set[Int]]})

在我的累加器中,我假设 m2 不会始终是在我的 foreach 循环中创建的单个项目集,并且有时 Spark 会使用此方法添加两个不同的映射,这些映射具有不止一个键.但是正因为如此,我的表现很低。正确的 Map 是否总是通过一个项目进入累加器,从我的 for each 循环中设置,还是我需要做出这种性能权衡?

你通常应该避免使用 Accumulators 除了调试之外的任何事情,因为据我所知,不能保证 RDD 的每个条目只会 "added" Accumulator 正好一次。

也许可以试试这样:

import scala.collection.mutable.HashSet
import scala.util.Try

val result = srch_destination_id_distinct.flatMap(r => 
  Try((r(0).toString.toInt, r(1).toString.toInt)).toOption
).aggregateByKey(HashSet.empty[Int])(
  (set, n) => set += n,
  (set1, set2) => set1 union set2
).mapValues(_.toSet).collectAsMap

aggregate 方法的 seqOpcombOp 参数之间的区别还允许我们避免 "wrapping" RDD 中的每个元素 Map[Int, Set[Int]] 就像你用你的方法做的那样。