访问元组内的元组以在 Spark 中进行匿名地图作业

acces tuple inside a tuple for anonymous map job in Spark

This post is essentially about how to build joint and marginal histograms from a (String, String) RDD. I posted the code that I eventually used below as the answer.

我有一个 RDD,其中包含一组 (String,String) 类型的元组,并且由于它们不是唯一的,所以我想看看每个字符串、字符串组合出现了多少次,所以我像这样使用 countByValue

val PairCount = Pairs.countByValue().toSeq

这给了我一个元组作为这样的输出 ((String,String),Long) 其中 long 是 (String, String) 元组出现的次数

这些字符串可以以不同的组合重复,我基本上想 运行 对这个 PairCount 变量进行字数统计,所以我尝试了这样的事情来开始:

PairCount.map(x => (x._1._1, x._2))

但是这个吐出的输出是String1->1,String2->1,String3->1,等等

在这种情况下,如何从映射作业输出键值对,其中键将是内部元组中的字符串值之一,而值将是外部元组中的 Long 值元组?

更新: @vitalii 让我快到了。答案让我得到一个 Seq[(String,Long)],但我真正需要的是将其转换为一个映射,以便之后我可以 运行 reduceByKey 它。当我 运行

PairCount.flatMap{case((x,y),n) => Seq[x->n]}.toMap

对于每个 unique x 我得到 x->1

例如上面的代码行生成 mom->1 dad->1 即使 flatMap 中的元组包含 (mom,30) (dad,59) (mom,2) (dad,14)在这种情况下,我希望 toMap 提供 mom->30,dad->59 mom->2 dad->14。但是,我是 scala 的新手,所以我可能误解了它的功能。

如何将 Tuple2 序列转换为映射,以便减少映射键?

如果我正确理解问题,你需要flatMap:

val pairCountRDD = pairs.countByValue() // RDD[((String, String), Int)]
val res : RDD[(String, Int)] = pairCountRDD.flatMap { case ((s1, s2), n) =>
   Seq(s1 -> n, s2 -> n)
}

更新:我不太明白你的最终目标是什么,但这里有几个例子可能对你有帮助,顺便说一句,上面的代码不正确,我错过了事实上 countByValue returns 映射,而不是 RDD:

val pairs = sc.parallelize(
  List(
    "mom"-> "dad", "dad" -> "granny", "foo" -> "bar", "foo" -> "baz", "foo" -> "foo"
  )
)
// don't use countByValue, if pairs is large you will run out of memmory
val pairCountRDD = pairs.map(x => (x, 1)).reduceByKey(_ + _) 

val wordCount = pairs.flatMap { case (a,b) => Seq(a -> 1, b ->1)}.reduceByKey(_ + _)

wordCount.take(10)

// count in how many pairs each word occur, keys and values:
val wordPairCount = pairs.flatMap { case (a,b) => 
               if (a == b) {
                 Seq(a->1)
               } else {
                  Seq(a -> 1, b ->1)
               }
             }.reduceByKey(_ + _)
wordPairCount.take(10)

为了获取 (String,String) RDD 的直方图,我使用了这段代码。

val Hist_X  = histogram.map(x => (x._1-> 1.0)).reduceByKey(_+_).collect().toMap
val Hist_Y  = histogram.map(x => (x._2-> 1.0)).reduceByKey(_+_).collect().toMap
val Hist_XY = histogram.map(x => (x-> 1.0)).reduceByKey(_+_)

其中直方图是 (String,String) RDD