世系如何在 Apache Spark 的 RDD 中传递
How does lineage get passed down in RDDs in Apache Spark
是否每个 RDD 都指向同一个谱系图
或
当父 RDD 将其血统赋予新 RDD 时,子 RDD 是否也复制了血统图,因此父 RDD 和子 RDD 具有不同的图。这样的话是不是很占内存?
每个 RDD 都维护一个指向一个或多个父节点的指针,以及关于它与父节点的关系类型的元数据。例如,当我们在一个 RDD 上调用 val b = a.map()
时,RDD b
只是保留一个对其父 a
的引用(并且从不复制),这就是一个沿袭 .
并且当驱动程序提交作业时,RDD 图被序列化到工作节点,以便每个工作节点在不同分区上应用一系列转换(如映射过滤器等)。此外,如果发生某些故障,此 RDD 谱系将用于重新计算数据。
为了显示 RDD 的沿袭,Spark 提供了调试方法 toDebugString()
方法。
考虑以下示例:
val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
.map(words => (words(0), 1))
.reduceByKey{(a,b) => a + b}
在splitedLines
RDD上执行toDebugString()
,将输出如下,
(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[5] at map at <console>:24 []
| MapPartitionsRDD[4] at map at <console>:23 []
| log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
| log.txt HadoopRDD[0] at textFile at <console>:21 []
有关 Spark 内部工作原理的更多信息,请阅读 my another post
调用转换(映射或过滤器等)时,Spark 不会立即执行它,而是为每个转换创建一个沿袭。
血统将跟踪必须在该 RDD 上应用的所有转换,
包括它必须读取数据的位置。
例如,考虑下面的例子
val myRdd = sc.textFile("spam.txt")
val filteredRdd = myRdd.filter(line => line.contains("wonder"))
filteredRdd.count()
sc.textFile() 和 myRdd.filter() 不会立即执行,
它只会在 RDD 上调用 Action 时执行 - 这里是 filteredRdd.count()。
Action 用于将结果保存到某个位置或显示它。
RDD沿袭信息也可以通过命令filteredRdd.toDebugString打印出来(这里filteredRdd就是RDD)。
此外,DAG 可视化以非常直观的方式显示了完整的图形,如下所示:
是否每个 RDD 都指向同一个谱系图
或
当父 RDD 将其血统赋予新 RDD 时,子 RDD 是否也复制了血统图,因此父 RDD 和子 RDD 具有不同的图。这样的话是不是很占内存?
每个 RDD 都维护一个指向一个或多个父节点的指针,以及关于它与父节点的关系类型的元数据。例如,当我们在一个 RDD 上调用 val b = a.map()
时,RDD b
只是保留一个对其父 a
的引用(并且从不复制),这就是一个沿袭 .
并且当驱动程序提交作业时,RDD 图被序列化到工作节点,以便每个工作节点在不同分区上应用一系列转换(如映射过滤器等)。此外,如果发生某些故障,此 RDD 谱系将用于重新计算数据。
为了显示 RDD 的沿袭,Spark 提供了调试方法 toDebugString()
方法。
考虑以下示例:
val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
.map(words => (words(0), 1))
.reduceByKey{(a,b) => a + b}
在splitedLines
RDD上执行toDebugString()
,将输出如下,
(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[5] at map at <console>:24 []
| MapPartitionsRDD[4] at map at <console>:23 []
| log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
| log.txt HadoopRDD[0] at textFile at <console>:21 []
有关 Spark 内部工作原理的更多信息,请阅读 my another post
调用转换(映射或过滤器等)时,Spark 不会立即执行它,而是为每个转换创建一个沿袭。 血统将跟踪必须在该 RDD 上应用的所有转换, 包括它必须读取数据的位置。
例如,考虑下面的例子
val myRdd = sc.textFile("spam.txt")
val filteredRdd = myRdd.filter(line => line.contains("wonder"))
filteredRdd.count()
sc.textFile() 和 myRdd.filter() 不会立即执行, 它只会在 RDD 上调用 Action 时执行 - 这里是 filteredRdd.count()。
Action 用于将结果保存到某个位置或显示它。 RDD沿袭信息也可以通过命令filteredRdd.toDebugString打印出来(这里filteredRdd就是RDD)。 此外,DAG 可视化以非常直观的方式显示了完整的图形,如下所示: