世系如何在 Apache Spark 的 RDD 中传递

How does lineage get passed down in RDDs in Apache Spark

是否每个 RDD 都指向同一个谱系图

当父 RDD 将其血统赋予新 RDD 时,子 RDD 是否也复制了血统图,因此父 RDD 和子 RDD 具有不同的图。这样的话是不是很占内存?

每个 RDD 都维护一个指向一个或多个父节点的指针,以及关于它与父节点的关系类型的元数据。例如,当我们在一个 RDD 上调用 val b = a.map() 时,RDD b 只是保留一个对其父 a 的引用(并且从不复制),这就是一个沿袭 .

并且当驱动程序提交作业时,RDD 图被序列化到工作节点,以便每个工作节点在不同分区上应用一系列转换(如映射过滤器等)。此外,如果发生某些故障,此 RDD 谱系将用于重新计算数据。

为了显示 RDD 的沿袭,Spark 提供了调试方法 toDebugString() 方法。

考虑以下示例:

val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
                    .map(words => (words(0), 1))
                    .reduceByKey{(a,b) => a + b}

splitedLines RDD上执行toDebugString(),将输出如下,

(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
    +-(2) MapPartitionsRDD[5] at map at <console>:24 []
    |  MapPartitionsRDD[4] at map at <console>:23 []
    |  log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
    |  log.txt HadoopRDD[0] at textFile at <console>:21 []

有关 Spark 内部工作原理的更多信息,请阅读 my another post

调用转换(映射或过滤器等)时,Spark 不会立即执行它,而是为每个转换创建一个沿袭。 血统将跟踪必须在该 RDD 上应用的所有转换, 包括它必须读取数据的位置。

例如,考虑下面的例子

val myRdd = sc.textFile("spam.txt")
val filteredRdd = myRdd.filter(line => line.contains("wonder"))
filteredRdd.count()

sc.textFile() 和 myRdd.filter() 不会立即执行, 它只会在 RDD 上调用 Action 时执行 - 这里是 filteredRdd.count()。

Action 用于将结果保存到某个位置或显示它。 RDD沿袭信息也可以通过命令filteredRdd.toDebugString打印出来(这里filteredRdd就是RDD)。 此外,DAG 可视化以非常直观的方式显示了完整的图形,如下所示: