如果目录之前不存在(没有 _delta_log),Spark 无法写入 delta table

Spark can not write delta table if the directory does not exits before (does not have _delta_log)

我正在尝试使用 Spark 在 MinIO 中编写增量 table,但出现此错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o103.save.
: java.io.FileNotFoundException: No such file or directory: s3a://*/*/*/_delta_log

目前我使用的maven包是:

--packages org.apache.hadoop:hadoop-aws:3.3.1,
org.apache.spark:spark-hadoop-cloud_2.12:3.2.1,
com.amazonaws:aws-java-sdk-bundle:1.11.901,
io.delta:delta-core_2.12:1.1.0,
io.delta:delta-contribs_2.12:1.1.0, 
com.microsoft.sqlserver:mssql-jdbc:9.2.1.jre11,
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,
ch.cern.sparkmeasure:spark-measure_2.12:0.17

更多信息:


堆栈跟踪:

df.write.partitionBy(partition).mode(mode).format("delta").save(f"s3a://{self.bucket}/{table}")
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 740, in save
File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o103.save.
: java.io.FileNotFoundException: No such file or directory: s3a://*/*/*/_delta_log
at org.apache.spark.sql.delta.storage.S3SingleDriverLogStore.listFromInternal(S3SingleDriverLogStore.scala:121)
at org.apache.spark.sql.delta.storage.S3SingleDriverLogStore.exists(S3SingleDriverLogStore.scala:156)
at org.apache.spark.sql.delta.storage.S3SingleDriverLogStore.write(S3SingleDriverLogStore.scala:174)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:742)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:715)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:86)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively(OptimisticTransaction.scala:684)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:86)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:112)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:97)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:86)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively(OptimisticTransaction.scala:680)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.lockCommitIfEnabled(OptimisticTransaction.scala:661)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:674)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:671)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:86)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.liftedTree1(OptimisticTransaction.scala:522)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit(OptimisticTransaction.scala:462)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:86)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:112)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:97)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:86)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:459)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:457)
at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:86)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run(WriteIntoDelta.scala:83)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$adapted(WriteIntoDelta.scala:78)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:198)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:78)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:154)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.$anonfun$applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands.applyOrElse(QueryExecution.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)

我认为软件包之间可能不兼容, 我希望有人有我正在寻找的答案,谢谢你的帮助

我遇到了同样的错误 https://github.com/minio/minio/issues/13486

然后我升级minio,修复它

minio/minio:RELEASE.2020-12-03T05-49-24Zminio/minio:RELEASE.2022-03-26T06-49-28Z.fips

希望对您有所帮助