FileNotFoundException:Spark 保存失败。无法从数据集 [T] avro 清除缓存

FileNotFoundException: Spark save fails. Cannot clear cache from Dataset[T] avro

第二次在 avro 中保存数据帧时出现以下错误。如果我在保存后删除 sub_folder/part-00000-XXX-c000.avro,然后尝试保存相同的数据集,我会得到以下信息:

FileNotFoundException: File /.../main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

该示例表明需要刷新表,但作为 sparkSession.catalog.listTables().show() 的输出,没有要刷新的表。

+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

之前保存的dataframe是这样的。该应用程序应该更新它:

+--------------------+--------------------+
|              Col1  |               Col2 |
+--------------------+--------------------+
|[123456, , ABC, [...|[[v1CK, RAWNAME1_,..|
|[123456, , ABC, [...|[[BG8M, RAWNAME2_...|
+--------------------+--------------------+

对我来说这是一个明显的缓存问题。但是,所有清除缓存的尝试都失败了:

 dataset.write
      .format("avro")
      .option("path", path)
      .mode(SaveMode.Overwrite) // Any save mode gives the same error
      .save()

// Moving this either before or after saving doesnt help.
sparkSession.catalog.clearCache()  

// This will not un-persist any cached data that is built upon this Dataset.
dataset.cache().unpersist()
dataset.unpersist() 

这就是我读取数据集的方式:

private def doReadFromPath[T <: SpecificRecord with Product with Serializable: TypeTag: ClassTag](path: String): Dataset[T] = {

    val df = sparkSession.read
      .format("avro")
      .load(path)
      .select("*")

    df.as[T]
  }

最后堆栈跟踪是这个。非常感谢您的帮助!:

ERROR [task-result-getter-3] (Logging.scala:70) - Task 0 in stage 9.0 failed 1 times; aborting job
ERROR [main] (Logging.scala:91) - Aborting job 150de02a-ac6a-4d42-824d-5db44a98c19a.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 11, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/DATA/XXX/main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:177)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:241)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    ... 10 more

*Reading from the same location and writing in to same location will give this issue. it was also discussed in this forum. along with my answer there *

错误中的以下消息将误导。但实际问题是 read/write from/in 相同的位置。

You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL

我举的不是你的例子(在你的案例中使用了 parquet avro)。

我有 2 个选项供您选择。

选项 1(cacheshow 的工作方式如下...):

import org.apache.spark.sql.functions._
  val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

  df.show(false)

  df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
  val df1 = spark.read.format("parquet").load(".../temp") // read back again

 val df2 = df1.withColumn("cleanup" , lit("Rod want to cleanup")) // like you said you want to clean it.

  //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

  df2.cache // cache to avoid FileNotFoundException
  df2.show(2, false) // light action to avoid FileNotFoundException
   // or println(df2.count) // action

   df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
  println("Rod saved in same directory where he read it from final records he saved after clean up are  ")
  df2.show(false)

选项 2:

1) save the DataFrame with a different avro folder.

2) Delete the old avro folder.

3) Finally rename this newly created avro folder to the old name, will work.

非常感谢 Ram Ghadiyaram!

解决方案有 2 个解决了我的问题,但仅限于我的本地 Ubuntu。当我在 HDFS 中测试时,问题依然存在。

解决方案 1 是确定的解决方案。这是我的代码现在的样子:

private def doWriteToPath[T <: Product with Serializable: TypeTag: ClassTag](dataset: Dataset[T], path: String): Unit = {

// clear any previously cached avro
sparkSession.catalog.clearCache()

// update the cache for this particular dataset, and trigger an action
dataset.cache().show(1)

dataset.write
  .format("avro")
  .option("path", path)
  .mode(SaveMode.Overwrite)
  .save()    
}

一些评论: 我确实检查过 post,但未成功尝试解决方案。我放弃了那是我的问题,原因如下:

  • 我在 'main_folder' 下创建了一个名为 'sub_folder_temp' 的 /temp,但保存仍然失败。
  • 在相同路径中以 json 格式保存相同的非空数据集实际上可以在没有此处讨论的解决方法的情况下工作。
  • 在相同路径中保存具有相同类型 [T] 的空数据集实际上可以在没有此处讨论的解决方法的情况下工作。