在 scala 中使用 reduceByKey 和 case

Using reduceByKey with case in scala

我有这种类型;

 column1 column2 int 
 ((a,b),1)
 ((a,c),1)
 ((k,a),1)

我需要两种类型的结果,第一种是所有第 2 列的第 1 列的总和

(a,total)

其次,column1 参数不等于 column2 参数

(a,total)

我如何使用 ReduceByKey 对于这种类型?

我的代码:

var data = sc.textFile("tttt.tsv")
var satir = data.map(line=> ((line.split("\t")(1).toString,line.split("\t")(2).toString),1))

---关于第二种情况

我的数据类型示例是

column1 column2 int
a b,1 
a c,1
a a,1
a d,1

我需要 reduceByKey 因为 column1 在第二种情况下不等于 column1

例如,我的示例数据结果 = (a,b,1) + (a,c,1)+(a,d,1) = 3

对于第一个场景,您可以使用这个。

val arrangedDF = satir.map(pairData => (pairData._1._1, (pairData._1._2, 1)))
val result1DF = arrangedDF.reduceByKey((x,y) => x._2 + y._2)

这里我重新排列了数据的表示形式,我将键分开,然后将其余数据放入不同的元组中,您可以直接应用reduceByKey

对于第二种情况,您可以使用它。

val result2DF = arrangedDF.filter( pairData => pairData._1 != pairData._2._1).reduceByKey((x,y) => x._2 + y._2)

这里是第二个场景,我重复使用 arrangedDF 并根据您想要的条件对其应用过滤器,即 column1 参数不等于 column2 参数,然后应用 reduceByKey

希望我的回答很清楚

谢谢

如果我对你的问题的理解正确,下面是获得你所问内容的一种方法:

val rdd = sc.parallelize(Seq(
  (("a", "b"), 1),
  (("a", "c"), 1),
  (("a", "d"), 1),
  (("a", "a"), 1),
  (("k", "k"), 1),
  (("k", "a"), 1),
  (("k", "b"), 1)
))

val rdd1 = rdd.map{ case ((x, y), c) => (x, c) }.
  reduceByKey(_ + _)

scala> rdd1.collect.foreach(println)
(a,4)
(k,3)

val rdd2 = rdd.filter{ case ((x, y), c) => x != y }.
  map{ case ((x, y), c) => (x, c) }.
  reduceByKey(_ + _)

scala> rdd2.collect.foreach(println)
(a,3)
(k,2)