具有缓存和操作的奇怪 Spark 行为

Strange Spark behavior with cache and action

我一直很想知道为什么在 运行 某个 spark 作业时我会出现奇怪的行为。如果我在缓存数据帧之后或将数据帧写回 hdfs 之前立即执行操作(.show(1) 方法),作业将出错。

这里有一个与 SO 非常相似的 post:

Spark SQL SaveMode.Overwrite, getting java.io.FileNotFoundException and requiring 'REFRESH TABLE tableName'

基本上另一个 post 解释说,当你从你正在写入的同一个 HDFS 目录读取时,你的 SaveMode"overwrite",那么你会得到一个 java.io.FileNotFoundException

但在这里我发现,仅仅移动程序中操作所在的位置会产生截然不同的结果——要么完成程序,要么给出这个异常。

我想知道是否有人可以解释为什么 Spark 在这里不一致?

 val myDF = spark.read.format("csv")
    .option("header", "false")
    .option("delimiter", "\t")
    .schema(schema)
    .load(myPath)

// If I cache it here or persist it then do an action after the cache, it will occasionally 
// not throw the error. This is when completely restarting the SparkSession so there is no
// risk of another user interfering on the same JVM.

      myDF.cache()
      myDF.show(1)

// Just an example.
// Many different transformations are then applied...

val secondDF = mergeOtherDFsWithmyDF(myDF, otherDF, thirdDF)

val fourthDF = mergeTwoDFs(thirdDF, StringToCheck, fifthDF)

// Below is the same .show(1) action call as was previously done, only this below
// action ALWAYS results in a successful completion and the above .show(1) sometimes results
// in FileNotFoundException and sometimes results in successful completion. The only
// thing that changes among test runs is only one is executed. Either
// fourthDF.show(1) or myDF.show(1) is left commented out

fourthDF.show(1)
fourthDF.write
    .mode(writeMode)
    .option("header", "false")
    .option("delimiter", "\t")
    .csv(myPath)

尝试使用 count 而不是 show(1),我认为问题是由于 Spark 试图变得聪明而不是加载整个数据帧(因为 show 不需要一切). 运行 count 强制 Spark 加载并正确缓存所有数据,这有望消除不一致。

Spark 仅按需具体化 rdds,大多数操作需要读取 DF 的所有分区,例如 count(),但 take() 和 first() 等操作不需要所有分区。

在您的情况下,它需要一个分区,因此只有 1 个分区被具体化和缓存。然后,当您执行 count() 时,所有分区都需要具体化并缓存到可用内存允许的范围内。