Spark 中的每文档字数统计
Per-Document Word Count in Spark
我正在学习 Spark(在 Scala 中)并且一直在尝试弄清楚如何计算文件每一行的所有单词。
我正在处理一个数据集,其中每一行都包含一个制表符分隔的 document_id 和文档的全文
doc_1 <full-text>
doc_2 <full-text>
etc..
这是我在名为 doc.txt
的文件中的玩具示例
doc_1 new york city new york state
doc_2 rain rain go away
我认为我需要做的是转换成包含
的元组
((doc_id, word), 1)
然后调用 reduceByKey() 对 1 求和。我写了以下内容:
val file = sc.textFile("docs.txt")
val tuples = file.map(_.split("\t"))
.map( x => (x(1).split("\s+")
.map(y => ((x(0), y), 1 )) ) )
哪个确实给了我我认为我需要的中间表示:
tuples.collect
res0: Array[Array[((String, String), Int)]] = Array(Array(((doc_1,new),1), ((doc_1,york),1), ((doc_1,city),1), ((doc_1,new),1), ((doc_1,york),1), ((doc_1,state),1)), Array(((doc_2,rain),1), ((doc_2,rain),1), ((doc_2,go),1), ((doc_2,away),1)))
但是如果在元组上调用 reduceByKey 会产生错误
tuples.reduceByKey(_ + )
<console>:21: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[Array[((String, String), Int)]]
tuples.reduceByKey(_ + )
我似乎无法理解如何做到这一点。我 认为 我需要对数组中的数组进行归约。我已经尝试了很多不同的事情,但像上面一样不断出现错误并且没有取得任何进展。
对此的任何指导/建议将不胜感激。
注意:我知道 https://spark.apache.org/examples.html 上有一个字数统计示例,展示了如何计算文件中所有字数。但这是针对整个输入文件的。我说的是获取每个文档的计数,其中每个文档都在不同的行上。
这是一个非常小的数据集的快速演示。
scala> val file = sc.textFile("../README.md")
15/02/02 00:32:38 INFO MemoryStore: ensureFreeSpace(32792) called with curMem=45512, maxMem=278302556
15/02/02 00:32:38 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 32.0 KB, free 265.3 MB)
file: org.apache.spark.rdd.RDD[String] = ../README.md MappedRDD[7] at textFile at <console>:12
scala> val splitLines = file.map{ line => line.split(" ") }
splitLines: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[9] at map at <console>:14
scala> splitLines.map{ arr => arr.toList.groupBy(identity).map{ x => (x._1, x._2.size) } }
res19: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Int]] = MappedRDD[10] at map at <console>:17
scala> val result = splitLines.map{ arr => arr.toList.groupBy(identity).map{ x => (x._1, x._2.size) } }
result: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Int]] = MappedRDD[11] at map at <console>:16
scala> result.take(10).foreach(println)
Map(# -> 1, Spark -> 1, Apache -> 1)
Map( -> 1)
Map(for -> 1, is -> 1, Data. -> 1, system -> 1, a -> 1, provides -> 1, computing -> 1, cluster -> 1, general -> 1, Spark -> 1, It -> 1, fast -> 1, Big -> 1, and -> 1)
Map(in -> 1, Scala, -> 1, optimized -> 1, APIs -> 1, that -> 1, Java, -> 1, high-level -> 1, an -> 1, Python, -> 1, and -> 2, engine -> 1)
Map(for -> 1, data -> 1, a -> 1, also -> 1, general -> 1, supports -> 2, It -> 1, graphs -> 1, analysis. -> 1, computation -> 1)
Map(for -> 1, set -> 1, tools -> 1, rich -> 1, Spark -> 1, structured -> 1, including -> 1, of -> 1, and -> 1, higher-level -> 1, SQL -> 2)
Map(GraphX -> 1, for -> 2, processing, -> 2, data -> 1, MLlib -> 1, learning, -> 1, machine -> 1, graph -> 1)
Map(for -> 1, Streaming -> 1, processing. -> 1, stream -> 1, Spark -> 1, and -> 1)
Map( -> 1)
Map(<http://spark.apache.org/> -> 1)
reduceByKey
期望类型 RDD[(K,V)]
而当您在第一个 map
中执行 split
时,您最终会得到一个 RDD[Array[...]]
,这是不是所需的类型签名。您可以按以下方式修改当前的解决方案......但它可能不会那么高效(阅读使用 flatMap
进行修改的代码):
//Dummy data load
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Split the data on tabs to get an array of (key, line) tuples
val firstPass = file.map(_.split("\t"))
//Split the line inside each tuple so you now have an array of (key, Array(...))
//Where the inner array is full of (word, 1) tuples
val secondPass = firstPass.map(x=>(x(0), x(1).split("\s+").map(y=>(y,1))))
//Now group the words and re-map so that the inner tuple is the wordcount
val finalPass = secondPass.map(x=>(x._1, x._2.groupBy(_._1).map(y=>(y._1,y._2.size))))
可能是更好的解决方案 vvvv :
如果您想保留当前的结构,则需要更改为从一开始就使用 Tuple2
,然后在:
之后使用 flatMap
//Load your data
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Turn the data into a key-value RDD (I suggest caching the split, kept 1 line for SO)
val firstPass = file.map(x=>(x.split("\t")(0), x.split("\t")(1)))
//Change your key to be a Tuple2[String,String] and the value is the count
val tuples = firstPass.flatMap(x=>x._2.split("\s+").map(y=>((x._1, y), 1)))
Input file content:
a b c d
a b e f
h i j l
m h i l
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> textFile = javaSparkContext.textFile("C:\Users\arun7.gupta\Desktop\Spark\word.txt");
/**Splitting the word with space*/
textFile = textFile.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
/**Pair the word with count*/
JavaPairRDD<String, Integer> mapToPair = textFile.mapToPair(w -> new Tuple2<>(w, 1));
/**Reduce the pair with key and add count*/
JavaPairRDD<String, Integer> reduceByKey = mapToPair.reduceByKey((wc1, wc2) -> wc1 + wc2);
System.out.println(reduceByKey.collectAsMap());
javaSparkContext.close();
}
Output:
{e=1, h=2, b=2, j=1, m=1, d=1, a=2, i=2, c=1, l=2, f=1}
我正在学习 Spark(在 Scala 中)并且一直在尝试弄清楚如何计算文件每一行的所有单词。 我正在处理一个数据集,其中每一行都包含一个制表符分隔的 document_id 和文档的全文
doc_1 <full-text>
doc_2 <full-text>
etc..
这是我在名为 doc.txt
的文件中的玩具示例doc_1 new york city new york state
doc_2 rain rain go away
我认为我需要做的是转换成包含
的元组((doc_id, word), 1)
然后调用 reduceByKey() 对 1 求和。我写了以下内容:
val file = sc.textFile("docs.txt")
val tuples = file.map(_.split("\t"))
.map( x => (x(1).split("\s+")
.map(y => ((x(0), y), 1 )) ) )
哪个确实给了我我认为我需要的中间表示:
tuples.collect
res0: Array[Array[((String, String), Int)]] = Array(Array(((doc_1,new),1), ((doc_1,york),1), ((doc_1,city),1), ((doc_1,new),1), ((doc_1,york),1), ((doc_1,state),1)), Array(((doc_2,rain),1), ((doc_2,rain),1), ((doc_2,go),1), ((doc_2,away),1)))
但是如果在元组上调用 reduceByKey 会产生错误
tuples.reduceByKey(_ + )
<console>:21: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[Array[((String, String), Int)]]
tuples.reduceByKey(_ + )
我似乎无法理解如何做到这一点。我 认为 我需要对数组中的数组进行归约。我已经尝试了很多不同的事情,但像上面一样不断出现错误并且没有取得任何进展。 对此的任何指导/建议将不胜感激。
注意:我知道 https://spark.apache.org/examples.html 上有一个字数统计示例,展示了如何计算文件中所有字数。但这是针对整个输入文件的。我说的是获取每个文档的计数,其中每个文档都在不同的行上。
这是一个非常小的数据集的快速演示。
scala> val file = sc.textFile("../README.md")
15/02/02 00:32:38 INFO MemoryStore: ensureFreeSpace(32792) called with curMem=45512, maxMem=278302556
15/02/02 00:32:38 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 32.0 KB, free 265.3 MB)
file: org.apache.spark.rdd.RDD[String] = ../README.md MappedRDD[7] at textFile at <console>:12
scala> val splitLines = file.map{ line => line.split(" ") }
splitLines: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[9] at map at <console>:14
scala> splitLines.map{ arr => arr.toList.groupBy(identity).map{ x => (x._1, x._2.size) } }
res19: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Int]] = MappedRDD[10] at map at <console>:17
scala> val result = splitLines.map{ arr => arr.toList.groupBy(identity).map{ x => (x._1, x._2.size) } }
result: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Int]] = MappedRDD[11] at map at <console>:16
scala> result.take(10).foreach(println)
Map(# -> 1, Spark -> 1, Apache -> 1)
Map( -> 1)
Map(for -> 1, is -> 1, Data. -> 1, system -> 1, a -> 1, provides -> 1, computing -> 1, cluster -> 1, general -> 1, Spark -> 1, It -> 1, fast -> 1, Big -> 1, and -> 1)
Map(in -> 1, Scala, -> 1, optimized -> 1, APIs -> 1, that -> 1, Java, -> 1, high-level -> 1, an -> 1, Python, -> 1, and -> 2, engine -> 1)
Map(for -> 1, data -> 1, a -> 1, also -> 1, general -> 1, supports -> 2, It -> 1, graphs -> 1, analysis. -> 1, computation -> 1)
Map(for -> 1, set -> 1, tools -> 1, rich -> 1, Spark -> 1, structured -> 1, including -> 1, of -> 1, and -> 1, higher-level -> 1, SQL -> 2)
Map(GraphX -> 1, for -> 2, processing, -> 2, data -> 1, MLlib -> 1, learning, -> 1, machine -> 1, graph -> 1)
Map(for -> 1, Streaming -> 1, processing. -> 1, stream -> 1, Spark -> 1, and -> 1)
Map( -> 1)
Map(<http://spark.apache.org/> -> 1)
reduceByKey
期望类型 RDD[(K,V)]
而当您在第一个 map
中执行 split
时,您最终会得到一个 RDD[Array[...]]
,这是不是所需的类型签名。您可以按以下方式修改当前的解决方案......但它可能不会那么高效(阅读使用 flatMap
进行修改的代码):
//Dummy data load
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Split the data on tabs to get an array of (key, line) tuples
val firstPass = file.map(_.split("\t"))
//Split the line inside each tuple so you now have an array of (key, Array(...))
//Where the inner array is full of (word, 1) tuples
val secondPass = firstPass.map(x=>(x(0), x(1).split("\s+").map(y=>(y,1))))
//Now group the words and re-map so that the inner tuple is the wordcount
val finalPass = secondPass.map(x=>(x._1, x._2.groupBy(_._1).map(y=>(y._1,y._2.size))))
可能是更好的解决方案 vvvv :
如果您想保留当前的结构,则需要更改为从一开始就使用 Tuple2
,然后在:
flatMap
//Load your data
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Turn the data into a key-value RDD (I suggest caching the split, kept 1 line for SO)
val firstPass = file.map(x=>(x.split("\t")(0), x.split("\t")(1)))
//Change your key to be a Tuple2[String,String] and the value is the count
val tuples = firstPass.flatMap(x=>x._2.split("\s+").map(y=>((x._1, y), 1)))
Input file content:
a b c d
a b e f
h i j l
m h i l
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> textFile = javaSparkContext.textFile("C:\Users\arun7.gupta\Desktop\Spark\word.txt");
/**Splitting the word with space*/
textFile = textFile.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
/**Pair the word with count*/
JavaPairRDD<String, Integer> mapToPair = textFile.mapToPair(w -> new Tuple2<>(w, 1));
/**Reduce the pair with key and add count*/
JavaPairRDD<String, Integer> reduceByKey = mapToPair.reduceByKey((wc1, wc2) -> wc1 + wc2);
System.out.println(reduceByKey.collectAsMap());
javaSparkContext.close();
}
Output:
{e=1, h=2, b=2, j=1, m=1, d=1, a=2, i=2, c=1, l=2, f=1}