如何根据Key从PairRDD获取新的RDD
How to get new RDD from PairRDD based on Key
在我的 Spark 应用程序中,我使用了一个 JavaPairRDD<Integer, List<Tuple3<String, String, String>>>
,它有大量数据。
我的要求是我需要一些其他的 RDD JavaRDD<Tuple3<String, String, String>>
来自那个基于键的 Large PairRDD。
我不知道 Java API,但在 Scala 中(在 spark-shell
中)可以这样做:
def rddByKey[K: ClassTag, V: ClassTag](rdd: RDD[(K, Seq[V])]) = {
rdd.keys.distinct.collect.map {
key => key -> rdd.filter(_._1 == key).values.flatMap(identity)
}
}
您必须 filter
每个键并用 flatMap
压平 List
。
我不得不提一下,这不是一个有用的操作。如果您能够构建原始 RDD,则意味着每个 List
都足够小以适合内存。所以我不明白你为什么要把它们变成 RDD。
在我的 Spark 应用程序中,我使用了一个 JavaPairRDD<Integer, List<Tuple3<String, String, String>>>
,它有大量数据。
我的要求是我需要一些其他的 RDD JavaRDD<Tuple3<String, String, String>>
来自那个基于键的 Large PairRDD。
我不知道 Java API,但在 Scala 中(在 spark-shell
中)可以这样做:
def rddByKey[K: ClassTag, V: ClassTag](rdd: RDD[(K, Seq[V])]) = {
rdd.keys.distinct.collect.map {
key => key -> rdd.filter(_._1 == key).values.flatMap(identity)
}
}
您必须 filter
每个键并用 flatMap
压平 List
。
我不得不提一下,这不是一个有用的操作。如果您能够构建原始 RDD,则意味着每个 List
都足够小以适合内存。所以我不明白你为什么要把它们变成 RDD。