如何根据Key从PairRDD获取新的RDD

How to get new RDD from PairRDD based on Key

在我的 Spark 应用程序中,我使用了一个 JavaPairRDD<Integer, List<Tuple3<String, String, String>>>,它有大量数据。

我的要求是我需要一些其他的 RDD JavaRDD<Tuple3<String, String, String>> 来自那个基于键的 Large PairRDD。

我不知道 Java API,但在 Scala 中(在 spark-shell 中)可以这样做:

def rddByKey[K: ClassTag, V: ClassTag](rdd: RDD[(K, Seq[V])]) = {
  rdd.keys.distinct.collect.map {
    key => key -> rdd.filter(_._1 == key).values.flatMap(identity)
  }
}

您必须 filter 每个键并用 flatMap 压平 List

我不得不提一下,这不是一个有用的操作。如果您能够构建原始 RDD,则意味着每个 List 都足够小以适合内存。所以我不明白你为什么要把它们变成 RDD。