persisted/cached RDD 上的 Spark RDD 检查点正在执行 DAG 两次

Spark RDD checkpoint on persisted/cached RDDs are performing the DAG twice

当我运行代码如下:

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())

并观察 Yarn 中的各个阶段,我注意到 Spark 进行了两次 DAG 计算——一次用于具体化 RDD 并缓存它的 distinct+count,然后完全是第二次创建检查点副本。

既然 RDD 已经具体化并缓存了,为什么检查点不简单地利用这一点,并将缓存的分区保存到磁盘?

是否有现有的方法(某种配置设置或代码更改)强制 Spark 利用此优势,并且仅 运行 操作一次,检查点只会复制内容?

我需要 "materialize" 两次吗?

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())

newRDD.checkpoint
print(newRDD.count())

我已经创建了一个 Apache Spark Jira 票证来将此作为功能请求: https://issues.apache.org/jira/browse/SPARK-8666

看起来这可能是一个已知问题。查看较旧的 JIRA 票证,https://issues.apache.org/jira/browse/SPARK-8582

您缓存的数据可能由于内存不足被逐出,您可以打开SparkUI查看是否属实。

这是一个老问题。但它也影响了我,所以我做了一些挖掘。我在 jira 和 github 的 change-tracking 历史记录中发现了一堆非常无用的搜索结果。这些搜索结果包含来自开发人员的大量 tech-babble 关于他们提议的编程更改。这对我来说并没有提供太多信息,我建议限制你花在看它上的时间。

关于此事我能找到的最清晰的信息如下: https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

An RDD which needs to be checkpointed will be computed twice; thus it is suggested to do a rdd.cache() before rdd.checkpoint()

考虑到 OP 实际上确实使用了持久化和检查点,他可能是在正确的轨道上。我怀疑唯一的问题是他调用检查点的方式。我是 spark 的新手,但我认为他应该这样做:

newRDD = newRDD.checkpoint

希望这是清楚的。根据我的测试,这消除了我的一个数据帧的冗余重新计算。