reduceByKey 与 groupByKey 与 aggregateByKey 与 combineByKey 之间的火花差异
Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey
谁能解释一下 reducebykey
、groupbykey
、aggregatebykey
和 combinebykey
之间的区别?我已经阅读了有关此的文档,但无法理解确切的区别。
最好能有例子说明。
ReduceByKey reduceByKey(func, [numTasks])
-
合并数据后,每个分区的每个键至少应有一个值。
然后洗牌发生,它通过网络发送到某个特定的执行者以执行一些操作,例如减少。
GroupByKey - groupByKey([numTasks])
它不会合并键的值,而是直接进行洗牌过程
这里有很多数据被发送到每个分区,几乎与初始数据相同。
每个键值的合并是在洗牌之后完成的。
这里有大量数据存储在最终工作节点上,因此导致内存不足问题。
AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
它类似于 reduceByKey 但您可以在执行聚合时提供初始值。
使用reduceByKey
reduceByKey
可以在我们运行大数据集的时候使用。
reduceByKey
当输入输出值类型相同时
超过 aggregateByKey
此外,建议不要使用 groupByKey
而更喜欢 reduceByKey
。详情可以参考here.
您也可以参考此 question 以更详细地了解 reduceByKey
和 aggregateByKey
。
groupByKey()
只是根据键对数据集进行分组。当 RDD 尚未分区时,将导致数据混洗。
reduceByKey()
类似于分组+聚合。我们可以说 reduceByKey()
等同于 dataset.group(...).reduce(...)。与 groupByKey()
. 不同,它会随机播放更少的数据
aggregateByKey()
在逻辑上与 reduceByKey()
相同,但它可以让您 return 产生不同的类型。换句话说,它允许您将输入作为类型 x 并将聚合结果作为类型 y。例如 (1,2),(1,4) 作为输入,(1,"six") 作为输出。它还需要 零值 将在每个键的开头应用。
注:一个相似点是都是宽运算
While both reducebykey and groupbykey will produce the same answer, the
reduceByKey example works much better on a large dataset. That's
because Spark knows it can combine output with a common key on each
partition before shuffling the data.
On the other hand, when calling groupByKey - all the key-value pairs
are shuffled around. This is a lot of unnessary data to being
transferred over the network.
有关详细信息,请查看下面的内容 link
虽然两者都将获取相同的结果,但是这两个函数的性能存在显着差异。与 groupByKey()
.
相比,reduceByKey()
适用于更大的数据集
在reduceByKey()
中,同一台机器上具有相同密钥的对在数据被打乱之前被合并(通过使用传递给reduceByKey()
的函数)。然后再次调用该函数以减少每个分区的所有值以产生一个最终结果。
在groupByKey()
中,所有键值对都被打乱。这是通过网络传输的大量不必要的数据。
groupByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey
可能会导致磁盘不足问题,因为数据是通过网络发送并在减少的工作人员上收集的。
reduceByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
数据在每个分区合并,每个分区的一个键只有一个输出通过网络发送。 reduceByKey
需要将您的所有值组合成另一个具有完全相同类型的值。
聚合按键:
同reduceByKey
,取初值
3 个参数作为输入
- 初始值
- 组合器逻辑
- 序列运算逻辑
示例:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
输出:
按关键和结果聚合
酒吧 - > 3
富 -> 5
组合键:
3 个参数作为输入
- 初始值:与
aggregateByKey
不同,不需要总是传递常量,我们可以传递一个函数来return一个新值。
- 合并函数
- 组合函数
示例:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey
,aggregateByKey
,combineByKey
优于 groupByKey
参考:
Avoid groupByKey
那么除了这4个,我们还有
foldByKey 与 reduceByKey 相同,但具有用户定义的零值。
AggregateByKey 将 3 个参数作为输入并使用 2 个函数进行合并(一个用于在相同分区上合并,另一个用于跨分区合并值。第一个参数是 ZeroValue)
而
ReduceBykey只有1个参数,是一个合并函数
CombineByKey有3个参数,3个都是函数。类似于 aggregateBykey 除了它可以有一个 ZeroValue 函数。
GroupByKey 不接受任何参数并将所有内容分组。此外,它是跨分区数据传输的开销。
谁能解释一下 reducebykey
、groupbykey
、aggregatebykey
和 combinebykey
之间的区别?我已经阅读了有关此的文档,但无法理解确切的区别。
最好能有例子说明。
ReduceByKey reduceByKey(func, [numTasks])
-
合并数据后,每个分区的每个键至少应有一个值。 然后洗牌发生,它通过网络发送到某个特定的执行者以执行一些操作,例如减少。
GroupByKey - groupByKey([numTasks])
它不会合并键的值,而是直接进行洗牌过程 这里有很多数据被发送到每个分区,几乎与初始数据相同。
每个键值的合并是在洗牌之后完成的。 这里有大量数据存储在最终工作节点上,因此导致内存不足问题。
AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
它类似于 reduceByKey 但您可以在执行聚合时提供初始值。
使用reduceByKey
reduceByKey
可以在我们运行大数据集的时候使用。reduceByKey
当输入输出值类型相同时 超过aggregateByKey
此外,建议不要使用 groupByKey
而更喜欢 reduceByKey
。详情可以参考here.
您也可以参考此 question 以更详细地了解 reduceByKey
和 aggregateByKey
。
groupByKey()
只是根据键对数据集进行分组。当 RDD 尚未分区时,将导致数据混洗。reduceByKey()
类似于分组+聚合。我们可以说reduceByKey()
等同于 dataset.group(...).reduce(...)。与groupByKey()
. 不同,它会随机播放更少的数据
aggregateByKey()
在逻辑上与reduceByKey()
相同,但它可以让您 return 产生不同的类型。换句话说,它允许您将输入作为类型 x 并将聚合结果作为类型 y。例如 (1,2),(1,4) 作为输入,(1,"six") 作为输出。它还需要 零值 将在每个键的开头应用。
注:一个相似点是都是宽运算
While both reducebykey and groupbykey will produce the same answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.
On the other hand, when calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.
有关详细信息,请查看下面的内容 link
虽然两者都将获取相同的结果,但是这两个函数的性能存在显着差异。与 groupByKey()
.
reduceByKey()
适用于更大的数据集
在reduceByKey()
中,同一台机器上具有相同密钥的对在数据被打乱之前被合并(通过使用传递给reduceByKey()
的函数)。然后再次调用该函数以减少每个分区的所有值以产生一个最终结果。
在groupByKey()
中,所有键值对都被打乱。这是通过网络传输的大量不必要的数据。
groupByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey
可能会导致磁盘不足问题,因为数据是通过网络发送并在减少的工作人员上收集的。
reduceByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
数据在每个分区合并,每个分区的一个键只有一个输出通过网络发送。 reduceByKey
需要将您的所有值组合成另一个具有完全相同类型的值。
聚合按键:
同reduceByKey
,取初值
3 个参数作为输入
- 初始值
- 组合器逻辑
- 序列运算逻辑
示例:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
输出: 按关键和结果聚合 酒吧 - > 3 富 -> 5
组合键:
3 个参数作为输入
- 初始值:与
aggregateByKey
不同,不需要总是传递常量,我们可以传递一个函数来return一个新值。 - 合并函数
- 组合函数
示例:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey
,aggregateByKey
,combineByKey
优于 groupByKey
参考: Avoid groupByKey
那么除了这4个,我们还有
foldByKey 与 reduceByKey 相同,但具有用户定义的零值。
AggregateByKey 将 3 个参数作为输入并使用 2 个函数进行合并(一个用于在相同分区上合并,另一个用于跨分区合并值。第一个参数是 ZeroValue)
而
ReduceBykey只有1个参数,是一个合并函数
CombineByKey有3个参数,3个都是函数。类似于 aggregateBykey 除了它可以有一个 ZeroValue 函数。
GroupByKey 不接受任何参数并将所有内容分组。此外,它是跨分区数据传输的开销。