字数统计(频率) spark rdd scala
word count(frequency) spark rdd scala
如果我有一个 rdd accross cluster 并且我想做字数统计
不仅统计出现次数,
我想获取频率,定义为count/total count
在 scala 中最好和最有效的方法是什么?
如何在一个工作流程中同时进行归约工作和计算总数?
顺便说一句,我知道纯粹的字数统计可以通过这种方式完成。
text_file = spark.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
但是如果我使用聚合有什么区别呢?在 spark 工作流程方面
val result = pairs
.aggregate(Map[String, Int]())((acc, pair) =>
if(acc.contains(pair._1))
acc ++ Map[String, Int]((pair._1, acc(pair._1)+1))
else
acc ++ Map[String, Int]((pair._1, pair._2))
,
(a, b) =>
(a.toSeq ++ b.toSeq)
.groupBy(_._1)
.mapValues(_.map(_._2).reduce(_ + _))
)
你可以用这个
val total = counts.map(x => x._2).sum()
val freq = counts.map(x => (x._1, x._2/total))
还有 Accumulator 的概念,它是一个 write-only 变量,您可以使用它来避免使用 sum()
操作,但是您的代码需要很多改变。
如果我有一个 rdd accross cluster 并且我想做字数统计
不仅统计出现次数, 我想获取频率,定义为count/total count
在 scala 中最好和最有效的方法是什么? 如何在一个工作流程中同时进行归约工作和计算总数?
顺便说一句,我知道纯粹的字数统计可以通过这种方式完成。
text_file = spark.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
但是如果我使用聚合有什么区别呢?在 spark 工作流程方面
val result = pairs
.aggregate(Map[String, Int]())((acc, pair) =>
if(acc.contains(pair._1))
acc ++ Map[String, Int]((pair._1, acc(pair._1)+1))
else
acc ++ Map[String, Int]((pair._1, pair._2))
,
(a, b) =>
(a.toSeq ++ b.toSeq)
.groupBy(_._1)
.mapValues(_.map(_._2).reduce(_ + _))
)
你可以用这个
val total = counts.map(x => x._2).sum()
val freq = counts.map(x => (x._1, x._2/total))
还有 Accumulator 的概念,它是一个 write-only 变量,您可以使用它来避免使用 sum()
操作,但是您的代码需要很多改变。