reduceByKey 与 groupByKey 与 aggregateByKey 与 combineByKey 之间的火花差异

Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey

谁能解释一下 reducebykeygroupbykeyaggregatebykeycombinebykey 之间的区别?我已经阅读了有关此的文档,但无法理解确切的区别。

最好能有例子说明。

ReduceByKey reduceByKey(func, [numTasks])-

合并数据后,每个分区的每个键至少应有一个值。 然后洗牌发生,它通过网络发送到某个特定的执行者以执行一些操作,例如减少。

GroupByKey - groupByKey([numTasks])

它不会合并键的值,而是直接进行洗牌过程 这里有很多数据被发送到每个分区,几乎与初始数据相同。

每个键值的合并是在洗牌之后完成的。 这里有大量数据存储在最终工作节点上,因此导致内存不足问题。

AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 它类似于 reduceByKey 但您可以在执行聚合时提供初始值。

使用reduceByKey

  • reduceByKey可以在我们运行大数据集的时候使用。

  • reduceByKey当输入输出值类型相同时 超过 aggregateByKey

此外,建议不要使用 groupByKey 而更喜欢 reduceByKey。详情可以参考here.

您也可以参考此 question 以更详细地了解 reduceByKeyaggregateByKey

  • groupByKey() 只是根据键对数据集进行分组。当 RDD 尚未分区时,将导致数据混洗。
  • reduceByKey() 类似于分组+聚合。我们可以说 reduceByKey() 等同于 dataset.group(...).reduce(...)。与 groupByKey().
  • 不同,它会随机播放更少的数据
  • aggregateByKey() 在逻辑上与 reduceByKey() 相同,但它可以让您 return 产生不同的类型。换句话说,它允许您将输入作为类型 x 并将聚合结果作为类型 y。例如 (1,2),(1,4) 作为输入,(1,"six") 作为输出。它还需要 零值 将在每个键的开头应用。

注:一个相似点是都是宽运算

While both reducebykey and groupbykey will produce the same answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.

On the other hand, when calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.

有关详细信息,请查看下面的内容 link

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

虽然两者都将获取相同的结果,但是这两个函数的性能存在显着差异。与 groupByKey().

相比,reduceByKey() 适用于更大的数据集

reduceByKey()中,同一台机器上具有相同密钥的对在数据被打乱之前被合并(通过使用传递给reduceByKey()的函数)。然后再次调用该函数以减少每个分区的所有值以产生一个最终结果。

groupByKey()中,所有键值对都被打乱。这是通过网络传输的大量不必要的数据。

groupByKey:

语法:

sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" ") )
                    .map(word => (word,1))
                    .groupByKey()
                    .map((x,y) => (x,sum(y)))
            

groupByKey 可能会导致磁盘不足问题,因为数据是通过网络发送并在减少的工作人员上收集的。

reduceByKey:

语法:

sparkContext.textFile("hdfs://")
                    .flatMap(line => line.split(" "))
                    .map(word => (word,1))
                    .reduceByKey((x,y)=> (x+y))

数据在每个分区合并,每个分区的一个键只有一个输出通过网络发送。 reduceByKey 需要将您的所有值组合成另一个具有完全相同类型的值。

聚合按键:

reduceByKey,取初值

3 个参数作为输入

  1. 初始值
  2. 组合器逻辑
  3. 序列运算逻辑

示例:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
    val data = sc.parallelize(keysWithValuesList)
    //Create key value pairs
    val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
    val initialCount = 0;
    val addToCounts = (n: Int, v: String) => n + 1
    val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
    val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)

输出: 按关键和结果聚合 酒吧 - > 3 富 -> 5

组合键:

3 个参数作为输入

  1. 初始值:与aggregateByKey不同,不需要总是传递常量,我们可以传递一个函数来return一个新值。
  2. 合并函数
  3. 组合函数

示例:

val result = rdd.combineByKey(
                        (v) => (v,1),
                        ( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
                        ( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2)) 
                        ).map( { case (k,v) => (k,v._1/v._2.toDouble) })
        result.collect.foreach(println)

reduceByKey,aggregateByKey,combineByKey 优于 groupByKey

参考: Avoid groupByKey

那么除了这4个,我们还有

foldByKey 与 reduceByKey 相同,但具有用户定义的零值。

AggregateByKey 将 3 个参数作为输入并使用 2 个函数进行合并(一个用于在相同分区上合并,另一个用于跨分区合并值。第一个参数是 ZeroValue)

ReduceBykey只有1个参数,是一个合并函数

CombineByKey有3个参数,3个都是函数。类似于 aggregateBykey 除了它可以有一个 ZeroValue 函数。

GroupByKey 不接受任何参数并将所有内容分组。此外,它是跨分区数据传输的开销。