reduceByKey:它在内部是如何工作的?

reduceByKey: How does it work internally?

我是 Spark 和 Scala 的新手。我对 reduceByKey 函数在 Spark 中的工作方式感到困惑。假设我们有以下代码:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

map 函数很清楚:s 是键,它指向 data.txt 的行,1 是值。

但是,我不明白 reduceByKey 在内部是如何工作的? "a" 是否指向密钥?或者,"a" 是否指向 "s"?那么a+b代表什么?它们是如何填充的?

在你的例子中

val counts = pairs.reduceByKey((a,b) => a+b)

ab 都是 pairs_2 元组的 Int 累加器。 reduceKey 将取两个具有相同值 s 的元组,并将它们的 _2 值用作 ab,生成一个新的 Tuple[String,Int]。重复此操作,直到每个键只有一个元组 s.

不同于非Spark(或者,实际上,非并行)reduceByKey,其中第一个元素始终是累加器,第二个元素是一个值,reduceByKey 以分布式方式运行,即每个节点会将其元组集缩减为 唯一键控 元组的集合,然后从多个节点缩减元组,直到有最终的 唯一键控 元组集。这意味着随着节点的结果减少,ab 代表已经减少的累加器。

让我们将其分解为离散的方法和类型。这通常会暴露新开发人员的复杂性:

pairs.reduceByKey((a, b) => a + b)

变成

pairs.reduceByKey((a: Int, b: Int) => a + b)

重命名变量使其更加明确

pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)

因此,我们现在可以看到,我们只是简单地为给定的键取一个累加值,然后将它与该键的下一个值相加。现在,让我们进一步分解它,以便我们了解关键部分。所以,让我们把这个方法想象得更像这样:

pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
  //Turn the accumulated value into a true key->value mapping
  val accumAsMap = accumulatedValue.toMap   
  //Try to get the key's current value if we've already encountered it
  accumAsMap.get(currentValue._1) match { 
    //If we have encountered it, then add the new value to the existing value and overwrite the old
    case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
    //If we have NOT encountered it, then simply add it to the list
    case None => currentValue :: accumulatedValue 
  }
})

因此,您可以看到 reduceByKey 采用了查找密钥并跟踪它的样板,因此您不必担心管理该部分。

更深入、更真实

综上所述,这只是所发生情况的简化版本,因为此处进行了一些优化。此操作是关联的,因此火花引擎将首先在本地执行这些归约(通常称为映射端归约),然后再次在驱动程序处执行。这样可以节省网络流量;它不是发送所有数据并执行操作,而是可以将其尽可能小地减少,然后通过网络发送该减少。

reduceByKey 函数的一个要求是它必须是关联的。为了对 reduceByKey 的工作原理建立一些直觉,让我们首先看看关联关联函数如何帮助我们进行并行计算:

正如我们所见,我们可以将原始集合分成几部分,然后通过应用关联函数,我们可以累加一个总和。顺序的情况很简单,我们习惯了:1+2+3+4+5+6+7+8+9+10.

关联性让我们可以按顺序并行使用相同的函数。 reduceByKey 使用 属性 从 RDD 计算结果,RDD 是一个由分区组成的分布式集合。

考虑以下示例:

// collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions
val rdd =sparkContext.parallelize(( (1 to 20).map(x=>("key",x))), 4)
rdd.reduceByKey(_ + _)
rdd.collect()
> Array[(String, Int)] = Array((key,210))

在 spark 中,数据分布到分区中。对于下一个图示,(4) 分区在左侧,用细线包围。首先,我们将函数本地应用于每个分区,在分区中按顺序进行,但我们 运行 所有 4 个分区并行。然后,通过应用相同的函数再次聚合每个本地计算的结果,最后得出一个结果。

reduceByKeyaggregateByKey 的特化 aggregateByKey 有两个函数:一个应用于每个分区(顺序),另一个应用于每个分区的结果(在平行下)。 reduceByKey 在这两种情况下使用相同的关联函数:对每个分区进行顺序计算,然后将这些结果组合成我们在此处说明的最终结果。

Spark RDD reduceByKey 函数使用关联归约函数合并每个键的值。

reduceByKey 函数仅适用于 RDD,这是一个转换操作,意味着它是惰性计算的。并且关联函数作为参数传递,它应用于源RDD并创建一个新的RDD作为结果。

因此在您的示例中,rdd 对具有一组多个成对元素,例如 (s1,1)、(s2,1) 等。并且 reduceByKey 接受一个函数 (accumulator, n) => (accumulator + n) ,它将累加器变量初始化为默认值 0 并将每个键的元素相加 return 结果 rdd 计数具有与键配对的总计数。