在 scala 中使用 reduceByKey 和 case
Using reduceByKey with case in scala
我有这种类型;
column1 column2 int
((a,b),1)
((a,c),1)
((k,a),1)
我需要两种类型的结果,第一种是所有第 2 列的第 1 列的总和
(a,total)
其次,column1 参数不等于 column2 参数
(a,total)
我如何使用 ReduceByKey 对于这种类型?
我的代码:
var data = sc.textFile("tttt.tsv")
var satir = data.map(line=> ((line.split("\t")(1).toString,line.split("\t")(2).toString),1))
---关于第二种情况
我的数据类型示例是
column1 column2 int
a b,1
a c,1
a a,1
a d,1
我需要 reduceByKey 因为 column1 在第二种情况下不等于 column1
例如,我的示例数据结果 = (a,b,1) + (a,c,1)+(a,d,1) = 3
对于第一个场景,您可以使用这个。
val arrangedDF = satir.map(pairData => (pairData._1._1, (pairData._1._2, 1)))
val result1DF = arrangedDF.reduceByKey((x,y) => x._2 + y._2)
这里我重新排列了数据的表示形式,我将键分开,然后将其余数据放入不同的元组中,您可以直接应用reduceByKey
。
对于第二种情况,您可以使用它。
val result2DF = arrangedDF.filter( pairData => pairData._1 != pairData._2._1).reduceByKey((x,y) => x._2 + y._2)
这里是第二个场景,我重复使用 arrangedDF
并根据您想要的条件对其应用过滤器,即 column1 参数不等于 column2 参数,然后应用 reduceByKey
希望我的回答很清楚
谢谢
如果我对你的问题的理解正确,下面是获得你所问内容的一种方法:
val rdd = sc.parallelize(Seq(
(("a", "b"), 1),
(("a", "c"), 1),
(("a", "d"), 1),
(("a", "a"), 1),
(("k", "k"), 1),
(("k", "a"), 1),
(("k", "b"), 1)
))
val rdd1 = rdd.map{ case ((x, y), c) => (x, c) }.
reduceByKey(_ + _)
scala> rdd1.collect.foreach(println)
(a,4)
(k,3)
val rdd2 = rdd.filter{ case ((x, y), c) => x != y }.
map{ case ((x, y), c) => (x, c) }.
reduceByKey(_ + _)
scala> rdd2.collect.foreach(println)
(a,3)
(k,2)
我有这种类型;
column1 column2 int
((a,b),1)
((a,c),1)
((k,a),1)
我需要两种类型的结果,第一种是所有第 2 列的第 1 列的总和
(a,total)
其次,column1 参数不等于 column2 参数
(a,total)
我如何使用 ReduceByKey 对于这种类型?
我的代码:
var data = sc.textFile("tttt.tsv")
var satir = data.map(line=> ((line.split("\t")(1).toString,line.split("\t")(2).toString),1))
---关于第二种情况
我的数据类型示例是
column1 column2 int
a b,1
a c,1
a a,1
a d,1
我需要 reduceByKey 因为 column1 在第二种情况下不等于 column1
例如,我的示例数据结果 = (a,b,1) + (a,c,1)+(a,d,1) = 3
对于第一个场景,您可以使用这个。
val arrangedDF = satir.map(pairData => (pairData._1._1, (pairData._1._2, 1)))
val result1DF = arrangedDF.reduceByKey((x,y) => x._2 + y._2)
这里我重新排列了数据的表示形式,我将键分开,然后将其余数据放入不同的元组中,您可以直接应用reduceByKey
。
对于第二种情况,您可以使用它。
val result2DF = arrangedDF.filter( pairData => pairData._1 != pairData._2._1).reduceByKey((x,y) => x._2 + y._2)
这里是第二个场景,我重复使用 arrangedDF
并根据您想要的条件对其应用过滤器,即 column1 参数不等于 column2 参数,然后应用 reduceByKey
希望我的回答很清楚
谢谢
如果我对你的问题的理解正确,下面是获得你所问内容的一种方法:
val rdd = sc.parallelize(Seq(
(("a", "b"), 1),
(("a", "c"), 1),
(("a", "d"), 1),
(("a", "a"), 1),
(("k", "k"), 1),
(("k", "a"), 1),
(("k", "b"), 1)
))
val rdd1 = rdd.map{ case ((x, y), c) => (x, c) }.
reduceByKey(_ + _)
scala> rdd1.collect.foreach(println)
(a,4)
(k,3)
val rdd2 = rdd.filter{ case ((x, y), c) => x != y }.
map{ case ((x, y), c) => (x, c) }.
reduceByKey(_ + _)
scala> rdd2.collect.foreach(println)
(a,3)
(k,2)