Apache spark 在 RDD 上应用映射转换

Apache spark applying map transformation on RDDs

我有一个 HadoopRDD,我正在使用它创建第一个具有简单 Map 函数的 RDD,然后使用另一个简单的 Map 函数从第一个 RDD 创建第二个 RDD。像 :

HadoopRDD -> RDD1 -> RDD2。

我的问题是,Spak 是逐条记录地遍历 HadoopRDD 生成 RDD1,然后逐条记录地遍历 RDD1 生成 RDD2,还是遍历 HadoopRDD,然后一次性生成 RDD1 和 RDD2。

Apache Spark 将以无特定顺序逐个记录地迭代 HadoopRDD 记录(数据将被拆分并发送给工作人员)和 "apply" 计算 RDD1 的第一个转换。之后,第二次转换应用于 RDD1 的每个元素以得到 RDD2,同样没有特定的顺序,依此类推连续转换。您可以从 map 方法签名中注意到它:

// Return a new RDD by applying a function to all elements of this RDD.
def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]

Apache Spark 遵循 DAG(有向无环图)执行引擎。它实际上不会触发任何计算,直到需要一个值,所以你必须区分 transformations and actions.

编辑:

在性能方面,我并不完全了解 Spark 的底层实现,但我知道除了在相关阶段添加额外(不必要的)任务外,应该不会有明显的性能损失。根据我的经验,您通常不会连续使用相同 "nature" 的转换(在本例中是两个连续的 map)。在进行改组操作时,您应该更加关注性能,因为您正在四处移动数据,这对您的工作性能有明显的影响。 Here 您可以找到与此相关的常见问题。

简答:rdd.map(f).map(g)将一次性执行。

tl;dr

Spark 将作业分成多个阶段。应用于数据分区的阶段是一项任务。

在一个阶段中,Spark 将尝试流水线化尽可能多的操作。 "Possible" 由重新排列数据的需要决定:需要随机播放的操作通常会破坏管道并创建新阶段。

实际上:

Given `rdd.map(...).map(..).filter(...).sort(...).map(...)`

将导致两个阶段:

.map(...).map(..).filter(...)
.sort(...).map(...)

这可以使用 rdd.toDebugString 从 rdd 中检索 上面相同的作业示例将产生此输出:

val mapped = rdd.map(identity).map(identity).filter(_>0).sortBy(x=>x).map(identity)

scala> mapped.toDebugString
res0: String = 
(6) MappedRDD[9] at map at <console>:14 []
 |  MappedRDD[8] at sortBy at <console>:14 []
 |  ShuffledRDD[7] at sortBy at <console>:14 []
 +-(8) MappedRDD[4] at sortBy at <console>:14 []
    |  FilteredRDD[3] at filter at <console>:14 []
    |  MappedRDD[2] at map at <console>:14 []
    |  MappedRDD[1] at map at <console>:14 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:12 []

现在,来到您问题的关键点:流水线非常高效。完整的管道将应用于每个分区的每个元素一次。这意味着 rdd.map(f).map(g) 的执行速度与 rdd.map(f andThen g) 一样快(有一些可忽略的开销)