保存 Parquet 文件时 _temporary/0 目录上的 FileNotFoundException

FileNotFoundException on _temporary/0 directory when saving Parquet files

在 Azure HDInsight 集群上使用 Python,我们使用以下代码将 Spark 数据帧作为 Parquet 文件保存到 Azure Data Lake Storage Gen2:

df.write.parquet('abfs://my_dwh_container@my_storage_account.dfs.core.windows.net/mypath, 'overwrite', compression='snappy')

这通常有效,但是当我们最近将集群升级到 运行 同时(大约 10 到 15)更多脚本时,对于一小部分脚本,我们始终会遇到以下异常:

Py4JJavaError: An error occurred while calling o2232.parquet. : java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, PUT, https://my_storage_account.dfs.core.windows.net/mypath/_temporary/0?resource=directory&timeout=90, PathNotFound, "The specified path does not exist."

我认为所有 Spark 作业和任务实际上都成功了,保存 table 的作业和任务也是如此,但随后 Python 脚本异常退出。


背景资料

我们正在使用 Spark 2.4.5.4.1.1.2。使用 Scala 版本 2.11.12、OpenJDK 64 位服务器 VM、1.8.0_265、Hadoop 3.1.2.4.1.1.2

堆栈跟踪:

  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 843, in parquet
    df_to_save.write.parquet(blob_path, mode, compression='snappy')
    self._jwrite.parquet(path)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2232.parquet.
: java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, PUT, https://my_dwh_container@my_storage_account.dfs.core.windows.net/mypath/_temporary/0?resource=directory&timeout=90, PathNotFound, "The specified path does not exist. RequestId:1870ec49-e01f-0101-72f8-f260fe000000 Time:2021-12-17T03:42:35.8434071Z"
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1178)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.mkdirs(AzureBlobFileSystem.java:477)
    at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2288)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:382)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:162)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

日志:

21/12/17 03:42:02 INFO DAGScheduler [Thread-11]: Job 2 finished: saveAsTable at NativeMethodAccessorImpl.java:0, took 1.120535 s
21/12/17 03:42:02 INFO FileFormatWriter [Thread-11]: Write Job 11fc45a5-d398-4f9a-8350-f928c3722886 committed.
21/12/17 03:42:02 INFO FileFormatWriter [Thread-11]: Finished processing stats for write job 11fc45a5-d398-4f9a-8350-f928c3722886.
(...)
21/12/17 03:42:05 INFO ParquetFileFormat [Thread-11]: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: File Output Committer Algorithm version is 2
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false, move _temporary folders into Trash: false
21/12/17 03:42:05 INFO SQLHadoopMapReduceCommitProtocol [Thread-11]: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: File Output Committer Algorithm version is 2
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false, move _temporary folders into Trash: false
21/12/17 03:42:05 INFO SQLHadoopMapReduceCommitProtocol [Thread-11]: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:28 ERROR ApplicationMaster [Driver]: User application exited with status 1
21/12/17 03:42:28 INFO ApplicationMaster [Driver]: Final app status: FAILED, exitCode: 1, (reason: User application exited with status 1)

这个异常还有另一个版本,它确实发生在 Spark 任务中,然后失败,但 Spark 会自动重新启动失败的任务,通常它会成功。在某些情况下,AM 会报告应用程序失败,但我不明白为什么,因为所有作业都成功了。

可能原因

中所示,我希望在完成所有任务之前不会移动 _temporary 目录。 查看堆栈跟踪,它发生在 AzureBlobFileSystem.mkdirs 中,这表明它正在尝试在 _temporary/0 下的某处创建子目录,但找不到 0 目录。我不确定 _temporary 目录此时是否存在。

相关问题

可以尝试的选项:

ABFS 是一个“真正的”文件系统,因此不需要 S3A 零重命名提交者。确实,它们不会起作用。客户端是完全开源的 - 查看 hadoop-azure 模块。

ADLS gen2 存储确实存在规模问题,但除非您尝试提交 10,000 个文件,或清理大量深层目录树,否则您不会遇到这些问题。如果您确实收到有关 Elliott 重命名单个文件的错误消息,并且您正在做这种规模的工作 (a) 与 Microsoft 讨论增加您分配的容量,以及 (b) 选择它 https://github.com/apache/hadoop/pull/2971

不是这个。我猜想实际上您有多个作业写入同一个输出路径,一个正在清理而另一个正在设置。特别是 - 他们的工作 ID 似乎都是“0”。由于使用了相同的作业 ID,仅当任务设置和任务清理混淆时,当一个作业提交时,它可能包含来自所有已成功提交的任务尝试的作业 2 的输出。

我认为这是 spark 独立部署的一个已知问题,但我找不到相关的 JIRA。 SPARK-24552 is close, but should have been fixed in your version. SPARK-33402 在同一秒内启动的作业具有重复的 MapReduce JobID。那是关于作业 ID 只是来自系统当前时间,而不是 0。但是:你可以尝试升级你的 spark 版本,看看它是否消失了。

我的建议

  1. 确保您的作业不会同时写入同一个 table。事情会变得一团糟。
  2. 获取您满意的最新版本 spark