具有缓存和操作的奇怪 Spark 行为

Strange Spark behavior with cache and action

我一直很想知道为什么在 运行 某个 spark 作业时我会出现奇怪的行为。如果我在缓存数据帧之后或将数据帧写回 hdfs 之前立即执行操作(.show(1) 方法),作业将出错。

这里有一个与 SO 非常相似的 post:

Spark SQL SaveMode.Overwrite, getting java.io.FileNotFoundException and requiring 'REFRESH TABLE tableName'

基本上另一个 post 解释说,当你从你正在写入的同一个 HDFS 目录读取时,你的 SaveMode"overwrite",那么你会得到一个 java.io.FileNotFoundException


我想知道是否有人可以解释为什么 Spark 在这里不一致?

 val myDF = spark.read.format("csv")
    .option("header", "false")
    .option("delimiter", "\t")

// If I cache it here or persist it then do an action after the cache, it will occasionally 
// not throw the error. This is when completely restarting the SparkSession so there is no
// risk of another user interfering on the same JVM.


// Just an example.
// Many different transformations are then applied...

val secondDF = mergeOtherDFsWithmyDF(myDF, otherDF, thirdDF)

val fourthDF = mergeTwoDFs(thirdDF, StringToCheck, fifthDF)

// Below is the same .show(1) action call as was previously done, only this below
// action ALWAYS results in a successful completion and the above .show(1) sometimes results
// in FileNotFoundException and sometimes results in successful completion. The only
// thing that changes among test runs is only one is executed. Either
// fourthDF.show(1) or myDF.show(1) is left commented out

    .option("header", "false")
    .option("delimiter", "\t")

尝试使用 count 而不是 show(1),我认为问题是由于 Spark 试图变得聪明而不是加载整个数据帧(因为 show 不需要一切). 运行 count 强制 Spark 加载并正确缓存所有数据,这有望消除不一致。

Spark 仅按需具体化 rdds,大多数操作需要读取 DF 的所有分区,例如 count(),但 take() 和 first() 等操作不需要所有分区。

在您的情况下,它需要一个分区,因此只有 1 个分区被具体化和缓存。然后,当您执行 count() 时,所有分区都需要具体化并缓存到可用内存允许的范围内。