如何使用 Spark RDD 生成或映射到另一个 RDD

How to work with a Spark RDD to produce or map to another RDD

我有一个 Key/Value RDD 我想获取其中 "iterate over" 中的实体,Key/Value,然后创建或映射到另一个可能具有更多或更少的 RDD第一个 RDD.

的条目

示例:

我积累了代表对绘画色彩观察的记录。 观察 entity/object 包含有关绘画名称和绘画中颜色的数据。

Observation
public String getPaintingName() {return paintingName;}
public List<String> getObservedColors() {return colorList}

我将 accumulo 中的观察作为 RDD 提取到我的代码中。

val observationRDD: RDD[(Text, Observation)] = getObservationsFromAccumulo();

我想使用这个 RDD 并创建一个形式为 (Color, paintingName) 的 RDD,其中键是观察到的颜色,值是观察到颜色的绘画名称。

 val colorToPaintingRDD: RDD[(String, String)] = observationRDD.somefunction({ case (_, observation) =>
    for(String color : observations.getObservedColors()) {
       // Some how output a entry into a new RDD
       //output/map (color, observation.getPaintingName)
 })

我知道地图无法工作,因为它是 1 对 1,我想也许 observationRDD.flatmap(某些功能)但似乎找不到任何示例来说明如何创建一个新的,更大或更小,RDD。

有人可以帮助我并告诉我平面图是否正确吗?如果正确,请使用我提供的这个示例给我一个示例,或者告诉我我是否偏离了基准?

请理解这只是一个简单的例子,这不是我要问的内容,而是如何将 RDD 转换为具有更多或更少条目的 RDD。

您应该为 RDD 中的每个元素使用 flatmap 和 return List[(String, String)]。 FlatMap 将使结果变平,你会得到一个 RDD[(String, String)]

我没有尝试代码,但它会是这样的:

val colorToPaintingRDD: RDD[(String, String)] = observationRDD.flatMap { case (_, observation) =>
    observations.getObservedColors().map(color => (color, observation.getPaintingName))
}

可能如果 getObservedColors 方法在 Java 中,您必须导入 JavaConversions 并更改为 scala 列表。

import scala.collection.JavaConversions._
observations.getObservedColors().toList