何时使用 countByValue 以及何时使用 map().reduceByKey()
When to use countByValue and when to use map().reduceByKey()
我是 Spark 和 scala 的新手,正在研究一个简单的 wordCount 示例。
为此,我使用 countByValue 如下:
val words = lines.flatMap(x => x.split("\W+")).map(x => x.toLowerCase())
val wordCount = words.countByValue();
效果很好。
同样的事情可以像这样实现:
val words = lines.flatMap(x => x.split("\W+")).map(x => x.toLowerCase())
val wordCounts = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
val sortedWords = wordCounts.map(x => (x._2, x._1)).sortByKey()
这也很好。
现在,我的问题是什么时候使用哪些方法?
哪个比另一个更受欢迎?
这里的例子 - 不是文字,而是数字:
val n = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
val n2 = n.countByValue
returns 本地地图:
n: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at command-3737881976236428:1
n2: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)
这是关键的区别。
如果您想要开箱即用的地图,那么这就是您要走的路。
此外,要点是 reduce 是隐含的,不会受到影响,也不需要像 reduceByKey 那样提供。
当数据量很大时,reduceByKey 有优先权。地图被完整加载到驱动程序内存中。
至少在 PySpark 中,它们是不同的东西。
countByKey
是用 reduce
实现的,这意味着 driver 将收集分区的部分结果并自行合并。如果你的结果很大,那么driver就得合并大量的大字典,这会让driver抓狂。
reduceByKey
将keysshuffle给不同的executor,在每个worker中做reduce,所以数据量大的时候更有利。
总之,当你的数据很大时,使用map
、reduceByKey
和collect
会让你的driver更快乐。如果您的数据很小,countByKey
将引入较少的网络流量(少一个阶段)。
除上述所有答案外,我进一步发现:
CountByValue return 无法以分布式方式使用的映射。
ReduceByKey returns 一个可以进一步以分布式方式使用的 rdd。
- countByValue() 是一个 RDD 动作,returns 每个唯一的计数
此 RDD 中的值作为 (value, count) 对的字典。
- reduceByKey() 是一个 RDD 转换 returns 成对格式的 RDD
我是 Spark 和 scala 的新手,正在研究一个简单的 wordCount 示例。
为此,我使用 countByValue 如下:
val words = lines.flatMap(x => x.split("\W+")).map(x => x.toLowerCase())
val wordCount = words.countByValue();
效果很好。
同样的事情可以像这样实现:
val words = lines.flatMap(x => x.split("\W+")).map(x => x.toLowerCase())
val wordCounts = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
val sortedWords = wordCounts.map(x => (x._2, x._1)).sortByKey()
这也很好。
现在,我的问题是什么时候使用哪些方法? 哪个比另一个更受欢迎?
这里的例子 - 不是文字,而是数字:
val n = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
val n2 = n.countByValue
returns 本地地图:
n: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at command-3737881976236428:1
n2: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)
这是关键的区别。
如果您想要开箱即用的地图,那么这就是您要走的路。
此外,要点是 reduce 是隐含的,不会受到影响,也不需要像 reduceByKey 那样提供。
当数据量很大时,reduceByKey 有优先权。地图被完整加载到驱动程序内存中。
至少在 PySpark 中,它们是不同的东西。
countByKey
是用 reduce
实现的,这意味着 driver 将收集分区的部分结果并自行合并。如果你的结果很大,那么driver就得合并大量的大字典,这会让driver抓狂。
reduceByKey
将keysshuffle给不同的executor,在每个worker中做reduce,所以数据量大的时候更有利。
总之,当你的数据很大时,使用map
、reduceByKey
和collect
会让你的driver更快乐。如果您的数据很小,countByKey
将引入较少的网络流量(少一个阶段)。
除上述所有答案外,我进一步发现:
CountByValue return 无法以分布式方式使用的映射。
ReduceByKey returns 一个可以进一步以分布式方式使用的 rdd。
- countByValue() 是一个 RDD 动作,returns 每个唯一的计数 此 RDD 中的值作为 (value, count) 对的字典。
- reduceByKey() 是一个 RDD 转换 returns 成对格式的 RDD