checkpointing / persisting / shuffling 似乎并不 'short circuit' rdd 的血统,如 'learning spark' 书中详述
checkpointing / persisting / shuffling does not seem to 'short circuit' the lineage of an rdd as detailed in 'learning spark' book
学习Spark,看了以下内容:
In addition to pipelining, Spark’s internal scheduler may truncate the lineage of the RDD graph if an existing RDD has already been persisted in cluster memory or on disk. Spark can “short-circuit” in this case and just begin computing based on the persisted RDD. A second case in which this truncation can happen is when an RDD is already materialized as a side effect of an earlier shuffle, even if it was not explicitly persist()ed. This is an under-the-hood optimization that takes advantage of the fact that Spark shuffle outputs are written to disk, and exploits the fact that many times portions of the RDD graph are recomputed.
所以,我决定尝试使用一个简单的程序(如下)来查看它的实际效果:
val pairs = spark.sparkContext.parallelize(List((1,2)))
val x = pairs.groupByKey()
x.toDebugString // before collect
x.collect()
x.toDebugString // after collect
spark.sparkContext.setCheckpointDir("/tmp")
// try both checkpointing and persisting to disk to cut lineage
x.checkpoint()
x.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
x.collect()
x.toDebugString // after checkpoint
阅读Spark书上的上述段落后,我没有看到我期望的内容。每次调用此方法时,我都会看到完全相同的 toDebugString 输出——每次都指示两个阶段(我本以为检查点应该截断沿袭之后只有一个阶段。)像这样:
scala> x.toDebugString // after collect
res5: String =
(8) ShuffledRDD[1] at groupByKey at <console>:25 []
+-(8) ParallelCollectionRDD[0] at parallelize at <console>:23 []
我想知道我是否忽略了 "may" 这个词,就像在 "schedule MAY truncate the lineage" 中一样。在其他情况下,对于我上面编写的相同程序,这种截断是否可能发生?或者我写的小程序没有做正确的事情来强制截断沿袭?提前感谢您提供的任何见解!
我认为你应该 persist/checkpoint 在 你先 collect
之前。
从我的代码来看,你得到的东西看起来是正确的,因为当 spark 首先执行时 collect
它不知道它应该保留或保存任何东西。
您可能还需要保存 x.persist
的结果然后使用它...
我建议 - 试试看:
val pairs = spark.sparkContext.parallelize(List((1,2)))
val x = pairs.groupByKey()
x.checkpoint()
x.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
// **Also maybe do val xx = x.persist(...) and use xx later.**
x.toDebugString // before collect
x.collect()
x.toDebugString // after collect
spark.sparkContext.setCheckpointDir("/tmp")
// try both checkpointing and persisting to disk to cut lineage
x.collect()
x.toDebugString // after checkpoint
学习Spark,看了以下内容:
In addition to pipelining, Spark’s internal scheduler may truncate the lineage of the RDD graph if an existing RDD has already been persisted in cluster memory or on disk. Spark can “short-circuit” in this case and just begin computing based on the persisted RDD. A second case in which this truncation can happen is when an RDD is already materialized as a side effect of an earlier shuffle, even if it was not explicitly persist()ed. This is an under-the-hood optimization that takes advantage of the fact that Spark shuffle outputs are written to disk, and exploits the fact that many times portions of the RDD graph are recomputed.
所以,我决定尝试使用一个简单的程序(如下)来查看它的实际效果:
val pairs = spark.sparkContext.parallelize(List((1,2)))
val x = pairs.groupByKey()
x.toDebugString // before collect
x.collect()
x.toDebugString // after collect
spark.sparkContext.setCheckpointDir("/tmp")
// try both checkpointing and persisting to disk to cut lineage
x.checkpoint()
x.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
x.collect()
x.toDebugString // after checkpoint
阅读Spark书上的上述段落后,我没有看到我期望的内容。每次调用此方法时,我都会看到完全相同的 toDebugString 输出——每次都指示两个阶段(我本以为检查点应该截断沿袭之后只有一个阶段。)像这样:
scala> x.toDebugString // after collect
res5: String =
(8) ShuffledRDD[1] at groupByKey at <console>:25 []
+-(8) ParallelCollectionRDD[0] at parallelize at <console>:23 []
我想知道我是否忽略了 "may" 这个词,就像在 "schedule MAY truncate the lineage" 中一样。在其他情况下,对于我上面编写的相同程序,这种截断是否可能发生?或者我写的小程序没有做正确的事情来强制截断沿袭?提前感谢您提供的任何见解!
我认为你应该 persist/checkpoint 在 你先 collect
之前。
从我的代码来看,你得到的东西看起来是正确的,因为当 spark 首先执行时 collect
它不知道它应该保留或保存任何东西。
您可能还需要保存 x.persist
的结果然后使用它...
我建议 - 试试看:
val pairs = spark.sparkContext.parallelize(List((1,2)))
val x = pairs.groupByKey()
x.checkpoint()
x.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
// **Also maybe do val xx = x.persist(...) and use xx later.**
x.toDebugString // before collect
x.collect()
x.toDebugString // after collect
spark.sparkContext.setCheckpointDir("/tmp")
// try both checkpointing and persisting to disk to cut lineage
x.collect()
x.toDebugString // after checkpoint