Spark 中的每文档字数统计

Per-Document Word Count in Spark

我正在学习 Spark(在 Scala 中)并且一直在尝试弄清楚如何计算文件每一行的所有单词。 我正在处理一个数据集,其中每一行都包含一个制表符分隔的 document_id 和文档的全文

doc_1   <full-text>
doc_2   <full-text>
etc..

这是我在名为 doc.txt

的文件中的玩具示例
doc_1   new york city new york state
doc_2   rain rain go away

认为我需要做的是转换成包含

的元组
((doc_id, word), 1)

然后调用 reduceByKey() 对 1 求和。我写了以下内容:

val file = sc.textFile("docs.txt")
val tuples = file.map(_.split("\t"))
            .map( x => (x(1).split("\s+")
            .map(y => ((x(0), y), 1 ))   ) )

哪个确实给了我我认为我需要的中间表示:

tuples.collect

res0: Array[Array[((String, String), Int)]] = Array(Array(((doc_1,new),1), ((doc_1,york),1), ((doc_1,city),1), ((doc_1,new),1), ((doc_1,york),1), ((doc_1,state),1)), Array(((doc_2,rain),1), ((doc_2,rain),1), ((doc_2,go),1), ((doc_2,away),1)))

但是如果在元组上调用 reduceByKey 会产生错误

tuples.reduceByKey(_ + )
<console>:21: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[Array[((String, String), Int)]]
              tuples.reduceByKey(_ + )

我似乎无法理解如何做到这一点。我 认为 我需要对数组中的数组进行归约。我已经尝试了很多不同的事情,但像上面一样不断出现错误并且没有取得任何进展。 对此的任何指导/建议将不胜感激。

注意:我知道 https://spark.apache.org/examples.html 上有一个字数统计示例,展示了如何计算文件中所有字数。但这是针对整个输入文件的。我说的是获取每个文档的计数,其中每个文档都在不同的行上。

这是一个非常小的数据集的快速演示。

scala> val file = sc.textFile("../README.md")
15/02/02 00:32:38 INFO MemoryStore: ensureFreeSpace(32792) called with curMem=45512, maxMem=278302556
15/02/02 00:32:38 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 32.0 KB, free 265.3 MB)
file: org.apache.spark.rdd.RDD[String] = ../README.md MappedRDD[7] at textFile at <console>:12

scala> val splitLines = file.map{ line => line.split(" ") } 
splitLines: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[9] at map at <console>:14

scala> splitLines.map{ arr => arr.toList.groupBy(identity).map{ x => (x._1, x._2.size) } }
res19: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Int]] = MappedRDD[10] at map at <console>:17

scala> val result = splitLines.map{ arr => arr.toList.groupBy(identity).map{ x => (x._1, x._2.size) } }
result: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Int]] = MappedRDD[11] at map at <console>:16

scala> result.take(10).foreach(println)

Map(# -> 1, Spark -> 1, Apache -> 1)
Map( -> 1)
Map(for -> 1, is -> 1, Data. -> 1, system -> 1, a -> 1, provides -> 1, computing -> 1, cluster -> 1, general -> 1, Spark -> 1, It -> 1, fast -> 1, Big -> 1, and -> 1)
Map(in -> 1, Scala, -> 1, optimized -> 1, APIs -> 1, that -> 1, Java, -> 1, high-level -> 1, an -> 1, Python, -> 1, and -> 2, engine -> 1)
Map(for -> 1, data -> 1, a -> 1, also -> 1, general -> 1, supports -> 2, It -> 1, graphs -> 1, analysis. -> 1, computation -> 1)
Map(for -> 1, set -> 1, tools -> 1, rich -> 1, Spark -> 1, structured -> 1, including -> 1, of -> 1, and -> 1, higher-level -> 1, SQL -> 2)
Map(GraphX -> 1, for -> 2, processing, -> 2, data -> 1, MLlib -> 1, learning, -> 1, machine -> 1, graph -> 1)
Map(for -> 1, Streaming -> 1, processing. -> 1, stream -> 1, Spark -> 1, and -> 1)
Map( -> 1)
Map(<http://spark.apache.org/> -> 1)

reduceByKey 期望类型 RDD[(K,V)] 而当您在第一个 map 中执行 split 时,您最终会得到一个 RDD[Array[...]],这是不是所需的类型签名。您可以按以下方式修改当前的解决方案......但它可能不会那么高效(阅读使用 flatMap 进行修改的代码):

//Dummy data load
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))  

//Split the data on tabs to get an array of (key, line) tuples
val firstPass = file.map(_.split("\t"))

//Split the line inside each tuple so you now have an array of (key, Array(...)) 
//Where the inner array is full of (word, 1) tuples
val secondPass = firstPass.map(x=>(x(0), x(1).split("\s+").map(y=>(y,1)))) 

//Now group the words and re-map so that the inner tuple is the wordcount
val finalPass = secondPass.map(x=>(x._1, x._2.groupBy(_._1).map(y=>(y._1,y._2.size))))

可能是更好的解决方案 vvvv :

如果您想保留当前的结构,则需要更改为从一开始就使用 Tuple2,然后在:

之后使用 flatMap
//Load your data
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Turn the data into a key-value RDD (I suggest caching the split, kept 1 line for SO)
val firstPass = file.map(x=>(x.split("\t")(0), x.split("\t")(1)))
//Change your key to be a Tuple2[String,String] and the value is the count
val tuples = firstPass.flatMap(x=>x._2.split("\s+").map(y=>((x._1, y), 1)))
    Input file content:
    a b c d
    a b e f 
    h i j l
    m h i l

    public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

            JavaRDD<String> textFile = javaSparkContext.textFile("C:\Users\arun7.gupta\Desktop\Spark\word.txt");

            /**Splitting the word with space*/
            textFile = textFile.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

            /**Pair the word with count*/
            JavaPairRDD<String, Integer> mapToPair = textFile.mapToPair(w -> new Tuple2<>(w, 1));

            /**Reduce the pair with key and add count*/
            JavaPairRDD<String, Integer> reduceByKey = mapToPair.reduceByKey((wc1, wc2) -> wc1 + wc2);

            System.out.println(reduceByKey.collectAsMap());
            javaSparkContext.close();
        }

Output:
{e=1, h=2, b=2, j=1, m=1, d=1, a=2, i=2, c=1, l=2, f=1}