Spark - 两个键值对 RDD 的求和值
Spark - Sum values of two key-value pair RDDs
我有两个文件A和B,其内容如下:
一个
brown i like
big is house
jumps over lazy
B
this is my house
my house is brown
brown is color
我想分别计算每个文件中每个单词的出现次数,然后对结果求和以获得两个文件中所有单词的计数,即如果一个单词在两个文件中都出现,那么它的最终计数将是分别是它在两个文件中的总计数。
以下是我到目前为止编写的代码:
val readme = sc.textFile("A.txt")
val readmesplit = readme.flatMap(line => line.split(" "))
val changes = sc.textFile("B.txt")
val changessplit = changes.flatMap(line => line.split(" "))
val readmeKV = readmesplit.map(x => (x, 1)).reduceByKey((x, y) => x + y)
val changesKV = changessplit.map(x => (x,1)).reduceByKey((x, y) => x + y)
val ans = readmeKV.fullOuterJoin(changesKV).collect()
此代码给出以下输出:
(this,(Some(1),None)), (is,(Some(3),Some(1))), (big,(None,Some(1))),
(lazy,(None,Some(1))), (house,(Some(2),Some(1))), (over,(None,Some(1)))...and so on
现在如何对每个键的值元组求和以获得两个文件中每个单词的出现次数。
val totals = ans.map {
case (word, (one, two)) => (word, one.getOrElse(0) + two.getOrElse(0))
}
只需提取这两个值,如果单词不存在则返回 0,然后将结果相加。
您是否尝试过使用 union
而不是 fullOuterJoin
? :
val ans = readmesplit.union(changessplit).map(x => (x,1)).reduceByKey((x, y) => x + y)
我有两个文件A和B,其内容如下:
一个
brown i like
big is house
jumps over lazy
B
this is my house
my house is brown
brown is color
我想分别计算每个文件中每个单词的出现次数,然后对结果求和以获得两个文件中所有单词的计数,即如果一个单词在两个文件中都出现,那么它的最终计数将是分别是它在两个文件中的总计数。
以下是我到目前为止编写的代码:
val readme = sc.textFile("A.txt")
val readmesplit = readme.flatMap(line => line.split(" "))
val changes = sc.textFile("B.txt")
val changessplit = changes.flatMap(line => line.split(" "))
val readmeKV = readmesplit.map(x => (x, 1)).reduceByKey((x, y) => x + y)
val changesKV = changessplit.map(x => (x,1)).reduceByKey((x, y) => x + y)
val ans = readmeKV.fullOuterJoin(changesKV).collect()
此代码给出以下输出:
(this,(Some(1),None)), (is,(Some(3),Some(1))), (big,(None,Some(1))),
(lazy,(None,Some(1))), (house,(Some(2),Some(1))), (over,(None,Some(1)))...and so on
现在如何对每个键的值元组求和以获得两个文件中每个单词的出现次数。
val totals = ans.map {
case (word, (one, two)) => (word, one.getOrElse(0) + two.getOrElse(0))
}
只需提取这两个值,如果单词不存在则返回 0,然后将结果相加。
您是否尝试过使用 union
而不是 fullOuterJoin
? :
val ans = readmesplit.union(changessplit).map(x => (x,1)).reduceByKey((x, y) => x + y)