Scala 中的 MapReduce 示例
MapReduce example in Scala
我在作业中遇到了 Scala 中的这个问题。
我有过但没能成功实现的想法是
遍历每个单词,如果单词是 basketball,则取下一个单词并将其添加到地图中。按键归约,从高到低排序。
不幸的是,我不知道如何获取单词列表中的下一个单词。
例如,我想做这样的事情:
val lines = spark.textFile("basketball_words_only.txt") // process lines in file
// split into individual words
val words = lines.flatMap(line => line.split(" "))
var listBuff = new ListBuffer[String]() // a list Buffer to hold each following word
val it = Iterator(words)
while (it.hasNext) {
listBuff += it.next().next() // <-- this is what I would like to do
}
val follows = listBuff.map(word => (word, 1))
val count = follows.reduceByKey((x, y) => x + y) // another issue as I cannot reduceByKey with a listBuffer
val sort = count.sortBy(_._2,false,1)
val result2 = sort.collect()
for (i <- 0 to result2.length - 1) {
printf("%s follows %d times\n", result1(2)._1, result2(i)._2);
}
如有任何帮助,我们将不胜感激
这来自https://spark.apache.org/examples.html:
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
如您所见,它计算单个单词的出现次数,因为键值对的形式为 (word, 1)。您需要更改哪一部分来计算单词组合?
这可能对您有帮助:http://daily-scala.blogspot.com/2009/11/iteratorsliding.html
您可以通过几个步骤获得所有不同单词对中第一个单词的最大计数:
- 去除标点符号,将内容拆分为小写的单词
- 使用
sliding(2)
创建单词对数组
- 使用
reduceByKey
计算不同单词对的出现次数
- 再次使用
reduceByKey
获取第一个单词最大计数的单词对
示例代码如下:
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.rdd.RDDFunctions._
val wordPairCountRDD = sc.textFile("/path/to/textfile").
flatMap( _.split("""[\s,.;:!?]+""") ).
map( _.toLowerCase ).
sliding(2).
map{ case Array(w1, w2) => ((w1, w2), 1) }.
reduceByKey( _ + _ )
val wordPairMaxRDD = wordPairCountRDD.
map{ case ((w1, w2), c) => (w1, (w2, c)) }.
reduceByKey( (acc, x) =>
if (x._2 > acc._2) (x._1, x._2) else acc
).
map{ case (w1, (w2, c)) => ((w1, w2), c) }
[更新]
如果您只需要根据修改后的要求对单词对计数进行排序(按降序排列),则可以跳过第 4 步并在 wordPairCountRDD
上使用 sortBy
:
wordPairCountRDD.
sortBy( z => (z._2, z._1._1, z._1._2), ascending = false )
好吧,我的文字使用 "b" 而不是 "basketball" 和 "a","c" 换句话说。
scala> val r = scala.util.Random
scala> val s = (1 to 20).map (i => List("a", "b", "c")(r.nextInt (3))).mkString (" ")
s: String = c a c b a b a a b c a b b c c a b b c b
split, sliding, filter, map, groupBy, map, sortBy得到的结果:
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0)).map { case (c: Char, l: List[String]) => (c, l.size)}.toList.sortBy (-_._2)
counts: List[(Char, Int)] = List((c,3), (b,2), (a,2))
小步滑动:
scala> val counts = s.split (" ").sliding (2).toList
counts: List[Array[String]] = List(Array(c, a), Array(a, c), Array(c, b), Array(b, a), Array(a, b), Array(b, a), Array(a, a), Array(a, b), Array(b, c), Array(c, a), Array(a, b), Array(b, b), Array(b, c), Array(c, c), Array(c, a), Array(a, b), Array(b, b), Array(b, c), Array(c, b))
过滤器:
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").toList
counts: List[Array[String]] = List(Array(b, a), Array(b, a), Array(b, c), Array(b, b), Array(b, c), Array(b, b), Array(b, c))
map(_(1))(数组访问元素2)
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList
counts: List[String] = List(a, a, c, b, c, b, c)
groupBy (_(0))
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0))
counts: scala.collection.immutable.Map[Char,List[String]] = Map(b -> List(b, b), a -> List(a, a), c -> List(c, c, c))
列表大小:
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0)).map { case (c: Char, l: List[String]) => (c, l.size)}
counts: scala.collection.immutable.Map[Char,Int] = Map(b -> 2, a -> 2, c -> 3)
最后降序排列:
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0)).map { case (c: Char, l: List[String]) => (c, l.size)}.toList.sortBy (-_._2)
counts: List[(Char, Int)] = List((c,3), (b,2), (a,2))
我在作业中遇到了 Scala 中的这个问题。 我有过但没能成功实现的想法是
遍历每个单词,如果单词是 basketball,则取下一个单词并将其添加到地图中。按键归约,从高到低排序。
不幸的是,我不知道如何获取单词列表中的下一个单词。 例如,我想做这样的事情:
val lines = spark.textFile("basketball_words_only.txt") // process lines in file
// split into individual words
val words = lines.flatMap(line => line.split(" "))
var listBuff = new ListBuffer[String]() // a list Buffer to hold each following word
val it = Iterator(words)
while (it.hasNext) {
listBuff += it.next().next() // <-- this is what I would like to do
}
val follows = listBuff.map(word => (word, 1))
val count = follows.reduceByKey((x, y) => x + y) // another issue as I cannot reduceByKey with a listBuffer
val sort = count.sortBy(_._2,false,1)
val result2 = sort.collect()
for (i <- 0 to result2.length - 1) {
printf("%s follows %d times\n", result1(2)._1, result2(i)._2);
}
如有任何帮助,我们将不胜感激
这来自https://spark.apache.org/examples.html:
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
如您所见,它计算单个单词的出现次数,因为键值对的形式为 (word, 1)。您需要更改哪一部分来计算单词组合?
这可能对您有帮助:http://daily-scala.blogspot.com/2009/11/iteratorsliding.html
您可以通过几个步骤获得所有不同单词对中第一个单词的最大计数:
- 去除标点符号,将内容拆分为小写的单词
- 使用
sliding(2)
创建单词对数组 - 使用
reduceByKey
计算不同单词对的出现次数 - 再次使用
reduceByKey
获取第一个单词最大计数的单词对
示例代码如下:
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.rdd.RDDFunctions._
val wordPairCountRDD = sc.textFile("/path/to/textfile").
flatMap( _.split("""[\s,.;:!?]+""") ).
map( _.toLowerCase ).
sliding(2).
map{ case Array(w1, w2) => ((w1, w2), 1) }.
reduceByKey( _ + _ )
val wordPairMaxRDD = wordPairCountRDD.
map{ case ((w1, w2), c) => (w1, (w2, c)) }.
reduceByKey( (acc, x) =>
if (x._2 > acc._2) (x._1, x._2) else acc
).
map{ case (w1, (w2, c)) => ((w1, w2), c) }
[更新]
如果您只需要根据修改后的要求对单词对计数进行排序(按降序排列),则可以跳过第 4 步并在 wordPairCountRDD
上使用 sortBy
:
wordPairCountRDD.
sortBy( z => (z._2, z._1._1, z._1._2), ascending = false )
好吧,我的文字使用 "b" 而不是 "basketball" 和 "a","c" 换句话说。
scala> val r = scala.util.Random
scala> val s = (1 to 20).map (i => List("a", "b", "c")(r.nextInt (3))).mkString (" ")
s: String = c a c b a b a a b c a b b c c a b b c b
split, sliding, filter, map, groupBy, map, sortBy得到的结果:
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0)).map { case (c: Char, l: List[String]) => (c, l.size)}.toList.sortBy (-_._2)
counts: List[(Char, Int)] = List((c,3), (b,2), (a,2))
小步滑动:
scala> val counts = s.split (" ").sliding (2).toList
counts: List[Array[String]] = List(Array(c, a), Array(a, c), Array(c, b), Array(b, a), Array(a, b), Array(b, a), Array(a, a), Array(a, b), Array(b, c), Array(c, a), Array(a, b), Array(b, b), Array(b, c), Array(c, c), Array(c, a), Array(a, b), Array(b, b), Array(b, c), Array(c, b))
过滤器:
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").toList
counts: List[Array[String]] = List(Array(b, a), Array(b, a), Array(b, c), Array(b, b), Array(b, c), Array(b, b), Array(b, c))
map(_(1))(数组访问元素2)
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList
counts: List[String] = List(a, a, c, b, c, b, c)
groupBy (_(0))
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0))
counts: scala.collection.immutable.Map[Char,List[String]] = Map(b -> List(b, b), a -> List(a, a), c -> List(c, c, c))
列表大小:
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0)).map { case (c: Char, l: List[String]) => (c, l.size)}
counts: scala.collection.immutable.Map[Char,Int] = Map(b -> 2, a -> 2, c -> 3)
最后降序排列:
scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0)).map { case (c: Char, l: List[String]) => (c, l.size)}.toList.sortBy (-_._2)
counts: List[(Char, Int)] = List((c,3), (b,2), (a,2))