reduceByKey:它在内部是如何工作的?
reduceByKey: How does it work internally?
我是 Spark 和 Scala 的新手。我对 reduceByKey 函数在 Spark 中的工作方式感到困惑。假设我们有以下代码:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
map 函数很清楚:s 是键,它指向 data.txt
的行,1 是值。
但是,我不明白 reduceByKey 在内部是如何工作的? "a" 是否指向密钥?或者,"a" 是否指向 "s"?那么a+b代表什么?它们是如何填充的?
在你的例子中
val counts = pairs.reduceByKey((a,b) => a+b)
a
和 b
都是 pairs
中 _2
元组的 Int
累加器。 reduceKey
将取两个具有相同值 s
的元组,并将它们的 _2
值用作 a
和 b
,生成一个新的 Tuple[String,Int]
。重复此操作,直到每个键只有一个元组 s
.
不同于非Spark(或者,实际上,非并行)reduceByKey
,其中第一个元素始终是累加器,第二个元素是一个值,reduceByKey
以分布式方式运行,即每个节点会将其元组集缩减为 唯一键控 元组的集合,然后从多个节点缩减元组,直到有最终的 唯一键控 元组集。这意味着随着节点的结果减少,a
和 b
代表已经减少的累加器。
让我们将其分解为离散的方法和类型。这通常会暴露新开发人员的复杂性:
pairs.reduceByKey((a, b) => a + b)
变成
pairs.reduceByKey((a: Int, b: Int) => a + b)
重命名变量使其更加明确
pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)
因此,我们现在可以看到,我们只是简单地为给定的键取一个累加值,然后将它与该键的下一个值相加。现在,让我们进一步分解它,以便我们了解关键部分。所以,让我们把这个方法想象得更像这样:
pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
//Turn the accumulated value into a true key->value mapping
val accumAsMap = accumulatedValue.toMap
//Try to get the key's current value if we've already encountered it
accumAsMap.get(currentValue._1) match {
//If we have encountered it, then add the new value to the existing value and overwrite the old
case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
//If we have NOT encountered it, then simply add it to the list
case None => currentValue :: accumulatedValue
}
})
因此,您可以看到 reduceByKey 采用了查找密钥并跟踪它的样板,因此您不必担心管理该部分。
更深入、更真实
综上所述,这只是所发生情况的简化版本,因为此处进行了一些优化。此操作是关联的,因此火花引擎将首先在本地执行这些归约(通常称为映射端归约),然后再次在驱动程序处执行。这样可以节省网络流量;它不是发送所有数据并执行操作,而是可以将其尽可能小地减少,然后通过网络发送该减少。
reduceByKey
函数的一个要求是它必须是关联的。为了对 reduceByKey
的工作原理建立一些直觉,让我们首先看看关联关联函数如何帮助我们进行并行计算:
正如我们所见,我们可以将原始集合分成几部分,然后通过应用关联函数,我们可以累加一个总和。顺序的情况很简单,我们习惯了:1+2+3+4+5+6+7+8+9+10.
关联性让我们可以按顺序并行使用相同的函数。 reduceByKey
使用 属性 从 RDD 计算结果,RDD 是一个由分区组成的分布式集合。
考虑以下示例:
// collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions
val rdd =sparkContext.parallelize(( (1 to 20).map(x=>("key",x))), 4)
rdd.reduceByKey(_ + _)
rdd.collect()
> Array[(String, Int)] = Array((key,210))
在 spark 中,数据分布到分区中。对于下一个图示,(4) 分区在左侧,用细线包围。首先,我们将函数本地应用于每个分区,在分区中按顺序进行,但我们 运行 所有 4 个分区并行。然后,通过应用相同的函数再次聚合每个本地计算的结果,最后得出一个结果。
reduceByKey
是 aggregateByKey
的特化 aggregateByKey
有两个函数:一个应用于每个分区(顺序),另一个应用于每个分区的结果(在平行下)。 reduceByKey
在这两种情况下使用相同的关联函数:对每个分区进行顺序计算,然后将这些结果组合成我们在此处说明的最终结果。
Spark RDD reduceByKey 函数使用关联归约函数合并每个键的值。
reduceByKey 函数仅适用于 RDD,这是一个转换操作,意味着它是惰性计算的。并且关联函数作为参数传递,它应用于源RDD并创建一个新的RDD作为结果。
因此在您的示例中,rdd 对具有一组多个成对元素,例如 (s1,1)、(s2,1) 等。并且 reduceByKey 接受一个函数 (accumulator, n) => (accumulator + n) ,它将累加器变量初始化为默认值 0 并将每个键的元素相加 return 结果 rdd 计数具有与键配对的总计数。
我是 Spark 和 Scala 的新手。我对 reduceByKey 函数在 Spark 中的工作方式感到困惑。假设我们有以下代码:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
map 函数很清楚:s 是键,它指向 data.txt
的行,1 是值。
但是,我不明白 reduceByKey 在内部是如何工作的? "a" 是否指向密钥?或者,"a" 是否指向 "s"?那么a+b代表什么?它们是如何填充的?
在你的例子中
val counts = pairs.reduceByKey((a,b) => a+b)
a
和 b
都是 pairs
中 _2
元组的 Int
累加器。 reduceKey
将取两个具有相同值 s
的元组,并将它们的 _2
值用作 a
和 b
,生成一个新的 Tuple[String,Int]
。重复此操作,直到每个键只有一个元组 s
.
不同于非Spark(或者,实际上,非并行)reduceByKey
,其中第一个元素始终是累加器,第二个元素是一个值,reduceByKey
以分布式方式运行,即每个节点会将其元组集缩减为 唯一键控 元组的集合,然后从多个节点缩减元组,直到有最终的 唯一键控 元组集。这意味着随着节点的结果减少,a
和 b
代表已经减少的累加器。
让我们将其分解为离散的方法和类型。这通常会暴露新开发人员的复杂性:
pairs.reduceByKey((a, b) => a + b)
变成
pairs.reduceByKey((a: Int, b: Int) => a + b)
重命名变量使其更加明确
pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)
因此,我们现在可以看到,我们只是简单地为给定的键取一个累加值,然后将它与该键的下一个值相加。现在,让我们进一步分解它,以便我们了解关键部分。所以,让我们把这个方法想象得更像这样:
pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
//Turn the accumulated value into a true key->value mapping
val accumAsMap = accumulatedValue.toMap
//Try to get the key's current value if we've already encountered it
accumAsMap.get(currentValue._1) match {
//If we have encountered it, then add the new value to the existing value and overwrite the old
case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
//If we have NOT encountered it, then simply add it to the list
case None => currentValue :: accumulatedValue
}
})
因此,您可以看到 reduceByKey 采用了查找密钥并跟踪它的样板,因此您不必担心管理该部分。
更深入、更真实
综上所述,这只是所发生情况的简化版本,因为此处进行了一些优化。此操作是关联的,因此火花引擎将首先在本地执行这些归约(通常称为映射端归约),然后再次在驱动程序处执行。这样可以节省网络流量;它不是发送所有数据并执行操作,而是可以将其尽可能小地减少,然后通过网络发送该减少。
reduceByKey
函数的一个要求是它必须是关联的。为了对 reduceByKey
的工作原理建立一些直觉,让我们首先看看关联关联函数如何帮助我们进行并行计算:
正如我们所见,我们可以将原始集合分成几部分,然后通过应用关联函数,我们可以累加一个总和。顺序的情况很简单,我们习惯了:1+2+3+4+5+6+7+8+9+10.
关联性让我们可以按顺序并行使用相同的函数。 reduceByKey
使用 属性 从 RDD 计算结果,RDD 是一个由分区组成的分布式集合。
考虑以下示例:
// collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions
val rdd =sparkContext.parallelize(( (1 to 20).map(x=>("key",x))), 4)
rdd.reduceByKey(_ + _)
rdd.collect()
> Array[(String, Int)] = Array((key,210))
在 spark 中,数据分布到分区中。对于下一个图示,(4) 分区在左侧,用细线包围。首先,我们将函数本地应用于每个分区,在分区中按顺序进行,但我们 运行 所有 4 个分区并行。然后,通过应用相同的函数再次聚合每个本地计算的结果,最后得出一个结果。
reduceByKey
是 aggregateByKey
的特化 aggregateByKey
有两个函数:一个应用于每个分区(顺序),另一个应用于每个分区的结果(在平行下)。 reduceByKey
在这两种情况下使用相同的关联函数:对每个分区进行顺序计算,然后将这些结果组合成我们在此处说明的最终结果。
Spark RDD reduceByKey 函数使用关联归约函数合并每个键的值。
reduceByKey 函数仅适用于 RDD,这是一个转换操作,意味着它是惰性计算的。并且关联函数作为参数传递,它应用于源RDD并创建一个新的RDD作为结果。
因此在您的示例中,rdd 对具有一组多个成对元素,例如 (s1,1)、(s2,1) 等。并且 reduceByKey 接受一个函数 (accumulator, n) => (accumulator + n) ,它将累加器变量初始化为默认值 0 并将每个键的元素相加 return 结果 rdd 计数具有与键配对的总计数。