Scala 中的 MapReduce 示例

MapReduce example in Scala

我在作业中遇到了 Scala 中的这个问题。 我有过但没能成功实现的想法是

遍历每个单词,如果单词是 basketball,则取下一个单词并将其添加到地图中。按键归约,从高到低排序。

不幸的是,我不知道如何获取单词列表中的下一个单词。 例如,我想做这样的事情:

val lines = spark.textFile("basketball_words_only.txt") // process lines in file

// split into individual words
val words = lines.flatMap(line => line.split(" "))

var listBuff = new ListBuffer[String]() // a list Buffer to hold each following word

val it = Iterator(words)  

while (it.hasNext) {
  listBuff += it.next().next() // <-- this is what I would like to do    
}

val follows = listBuff.map(word => (word, 1))
val count = follows.reduceByKey((x, y) => x + y) // another issue as I cannot reduceByKey with a listBuffer

val sort = count.sortBy(_._2,false,1)

val result2 = sort.collect()

for (i <- 0 to result2.length - 1) {
 printf("%s follows %d times\n", result1(2)._1, result2(i)._2);
}

如有任何帮助,我们将不胜感激

这来自https://spark.apache.org/examples.html

val counts = textFile.flatMap(line => line.split(" "))   
                     .map(word => (word, 1))
                     .reduceByKey(_ + _)

如您所见,它计算单个单词的出现次数,因为键值对的形式为 (word, 1)。您需要更改哪一部分来计算单词组合?

这可能对您有帮助:http://daily-scala.blogspot.com/2009/11/iteratorsliding.html

您可以通过几个步骤获得所有不同单词对中第一个单词的最大计数:

  1. 去除标点符号,将内容拆分为小写的单词
  2. 使用sliding(2)创建单词对数组
  3. 使用reduceByKey计算不同单词对的出现次数
  4. 再次使用reduceByKey获取第一个单词最大计数的单词对

示例代码如下:

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.rdd.RDDFunctions._

val wordPairCountRDD = sc.textFile("/path/to/textfile").
  flatMap( _.split("""[\s,.;:!?]+""") ).
  map( _.toLowerCase ).
  sliding(2).
  map{ case Array(w1, w2) => ((w1, w2), 1) }.
  reduceByKey( _ + _ )

val wordPairMaxRDD = wordPairCountRDD.
  map{ case ((w1, w2), c) => (w1, (w2, c)) }.
  reduceByKey( (acc, x) =>
    if (x._2 > acc._2) (x._1, x._2) else acc
  ).
  map{ case (w1, (w2, c)) => ((w1, w2), c) }

[更新]

如果您只需要根据修改后的要求对单词对计数进行排序(按降序排列),则可以跳过第 4 步并在 wordPairCountRDD 上使用 sortBy:

wordPairCountRDD.
  sortBy( z => (z._2, z._1._1, z._1._2), ascending = false )

好吧,我的文字使用 "b" 而不是 "basketball" 和 "a","c" 换句话说。

scala> val r = scala.util.Random 
scala> val s = (1 to 20).map (i => List("a", "b", "c")(r.nextInt (3))).mkString (" ")
s: String = c a c b a b a a b c a b b c c a b b c b

split, sliding, filter, map, groupBy, map, sortBy得到的结果:

scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0)).map { case (c: Char, l: List[String]) => (c, l.size)}.toList.sortBy (-_._2) 
counts: List[(Char, Int)] = List((c,3), (b,2), (a,2))

小步滑动:

scala> val counts = s.split (" ").sliding (2).toList
counts: List[Array[String]] = List(Array(c, a), Array(a, c), Array(c, b), Array(b, a), Array(a, b), Array(b, a), Array(a, a), Array(a, b), Array(b, c), Array(c, a), Array(a, b), Array(b, b), Array(b, c), Array(c, c), Array(c, a), Array(a, b), Array(b, b), Array(b, c), Array(c, b))

过滤器:

scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").toList
counts: List[Array[String]] = List(Array(b, a), Array(b, a), Array(b, c), Array(b, b), Array(b, c), Array(b, b), Array(b, c))

map(_(1))(数组访问元素2)

scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList
counts: List[String] = List(a, a, c, b, c, b, c)

groupBy (_(0))

scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0))
counts: scala.collection.immutable.Map[Char,List[String]] = Map(b -> List(b, b), a -> List(a, a), c -> List(c, c, c))

列表大小:

scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0)).map { case (c: Char, l: List[String]) => (c, l.size)}
counts: scala.collection.immutable.Map[Char,Int] = Map(b -> 2, a -> 2, c -> 3)

最后降序排列:

scala> val counts = s.split (" ").sliding (2).filter (_(0) == "b").map (_(1)).toList.groupBy (_(0)).map { case (c: Char, l: List[String]) => (c, l.size)}.toList.sortBy (-_._2) 
counts: List[(Char, Int)] = List((c,3), (b,2), (a,2))