Spark 累加器:正确的累加器有时是多个还是总是一个?
Spark Accumulators: Is the right accumulator sometimes many or always one?
我正在尝试使用 Spark 累加器按查询删除性能不佳的组。
import org.apache.spark._
object CountPairsParam extends AccumulatorParam[Map[Int, Set[Int]]] {
def zero(initialValue: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
Map.empty[Int, Set[Int]]
}
def addInPlace(m1: Map[Int, Set[Int]], m2: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
val keys = m1.keys ++ m2.keys
keys.map((k: Int) => (k -> (m1.getOrElse(k, Set.empty[Int]) ++ m2.getOrElse(k, Set.empty[Int])))).toMap
}
}
val accum = sc.accumulator(Map.empty[Int, Set[Int]])(CountPairsParam)
srch_destination_id_distinct.foreach(r => try{accum += Map(r(0).toString.toInt -> Set(r(1).toString.toInt))} catch {case ioe: NumberFormatException => Map.empty[Int, Set[Int]]})
在我的累加器中,我假设 m2 不会始终是在我的 foreach 循环中创建的单个项目集,并且有时 Spark 会使用此方法添加两个不同的映射,这些映射具有不止一个键.但是正因为如此,我的表现很低。正确的 Map 是否总是通过一个项目进入累加器,从我的 for each 循环中设置,还是我需要做出这种性能权衡?
你通常应该避免使用 Accumulators
除了调试之外的任何事情,因为据我所知,不能保证 RDD
的每个条目只会 "added" Accumulator
正好一次。
也许可以试试这样:
import scala.collection.mutable.HashSet
import scala.util.Try
val result = srch_destination_id_distinct.flatMap(r =>
Try((r(0).toString.toInt, r(1).toString.toInt)).toOption
).aggregateByKey(HashSet.empty[Int])(
(set, n) => set += n,
(set1, set2) => set1 union set2
).mapValues(_.toSet).collectAsMap
aggregate
方法的 seqOp
和 combOp
参数之间的区别还允许我们避免 "wrapping" RDD
中的每个元素 Map[Int, Set[Int]]
就像你用你的方法做的那样。
我正在尝试使用 Spark 累加器按查询删除性能不佳的组。
import org.apache.spark._
object CountPairsParam extends AccumulatorParam[Map[Int, Set[Int]]] {
def zero(initialValue: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
Map.empty[Int, Set[Int]]
}
def addInPlace(m1: Map[Int, Set[Int]], m2: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
val keys = m1.keys ++ m2.keys
keys.map((k: Int) => (k -> (m1.getOrElse(k, Set.empty[Int]) ++ m2.getOrElse(k, Set.empty[Int])))).toMap
}
}
val accum = sc.accumulator(Map.empty[Int, Set[Int]])(CountPairsParam)
srch_destination_id_distinct.foreach(r => try{accum += Map(r(0).toString.toInt -> Set(r(1).toString.toInt))} catch {case ioe: NumberFormatException => Map.empty[Int, Set[Int]]})
在我的累加器中,我假设 m2 不会始终是在我的 foreach 循环中创建的单个项目集,并且有时 Spark 会使用此方法添加两个不同的映射,这些映射具有不止一个键.但是正因为如此,我的表现很低。正确的 Map 是否总是通过一个项目进入累加器,从我的 for each 循环中设置,还是我需要做出这种性能权衡?
你通常应该避免使用 Accumulators
除了调试之外的任何事情,因为据我所知,不能保证 RDD
的每个条目只会 "added" Accumulator
正好一次。
也许可以试试这样:
import scala.collection.mutable.HashSet
import scala.util.Try
val result = srch_destination_id_distinct.flatMap(r =>
Try((r(0).toString.toInt, r(1).toString.toInt)).toOption
).aggregateByKey(HashSet.empty[Int])(
(set, n) => set += n,
(set1, set2) => set1 union set2
).mapValues(_.toSet).collectAsMap
aggregate
方法的 seqOp
和 combOp
参数之间的区别还允许我们避免 "wrapping" RDD
中的每个元素 Map[Int, Set[Int]]
就像你用你的方法做的那样。