在数据块上使用自定义 FileFormat 时搜索 _delta_log 的奇怪请求

Strange requests searching for _delta_log when using custom FileFormat on databricks

我观察到使用自定义文件格式时数据块发出的非常奇怪的请求。 我需要实现自定义 FileFormat 以读取 spark sql 中的二进制文件。我已经实现了 FileFormat class(实现主要是来自 AvroFileFormat 的 copy/paste),通过 META-INF/services 注册并使用如下:

val df = spark.read.format("my-format").load("s3://exos-dev/table_data/file.bin")

它在本地 spark 和数据块中都有效。但是,在数据块中,我看到在我的文件格式实现被加载之前 - 它试图在目录 s3://exos-dev/table_data/file.bin 下找到 _delta_log 实际上它发出了 3 个请求,请参阅错误日志

最后,在 3 s3 读取后,它加载了我的 RecAvroFormat 实现并使用了它。 这是它开始的地方:

at org.apache.spark.sql.DataFrameReader.preprocessDeltaLoading(DataFrameReader.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:278)

方法preprocessDeltaLoading 仅存在于databricks spark 中。我该如何禁用它? 我需要循环读取 10k 数据帧,因此它会发出 30k 无用的 s3 请求,这似乎会减慢我的代码速度。

错误日志:

20/05/15 07:11:14 INFO S3AFileSystem: Stack trace: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com table_data/file.bin/_delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 98EAF67749BB0068, Extended Request ID: ...
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4921)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4867)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1320)
    at com.databricks.s3a.aws.EnforcingDatabricksS3Client$$anonfun$getObjectMetadata.apply(EnforcingDatabricksS3Client.scala:220)
    at com.databricks.s3a.aws.EnforcingDatabricksS3Client$$anonfun$getObjectMetadata.apply(EnforcingDatabricksS3Client.scala:220)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.s3a.aws.DatabricksS3Client$class.retryRequest(DatabricksS3Client.scala:151)
    at com.databricks.s3a.aws.DatabricksS3Client$class.withExponentialBackoff(DatabricksS3Client.scala:125)
    at com.databricks.s3a.aws.EnforcingDatabricksS3Client.withExponentialBackoff(EnforcingDatabricksS3Client.scala:28)
    at com.databricks.s3a.aws.EnforcingDatabricksS3Client.getObjectMetadata(EnforcingDatabricksS3Client.scala:219)
    at com.databricks.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1876)
    at com.databricks.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1378)
    at com.databricks.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:88)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
    at com.databricks.sql.transaction.tahoe.DeltaTableUtils$$anonfun$findDeltaTableRoot.apply$mcZ$sp(DeltaTable.scala:171)
    at com.databricks.sql.transaction.tahoe.DeltaTableUtils$$anonfun$findDeltaTableRoot.apply(DeltaTable.scala:171)
    at com.databricks.sql.transaction.tahoe.DeltaTableUtils$$anonfun$findDeltaTableRoot.apply(DeltaTable.scala:171)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.sql.transaction.tahoe.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:171)
    at org.apache.spark.sql.DataFrameReader.preprocessDeltaLoading(DataFrameReader.scala:230)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:278)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3147709739428370:1)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3147709739428370:51)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw$$iw.<init>(command-3147709739428370:53)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw$$iw.<init>(command-3147709739428370:55)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw$$iw.<init>(command-3147709739428370:57)
    at line2bfff6d09387477794603ac98a6c816640.$read$$iw.<init>(command-3147709739428370:59)
    at line2bfff6d09387477794603ac98a6c816640.$read.<init>(command-3147709739428370:61)
    at line2bfff6d09387477794603ac98a6c816640.$read$.<init>(command-3147709739428370:65)
    at line2bfff6d09387477794603ac98a6c816640.$read$.<clinit>(command-3147709739428370)
    at line2bfff6d09387477794603ac98a6c816640.$eval$.$print$lzycompute(<notebook>:7)
    at line2bfff6d09387477794603ac98a6c816640.$eval$.$print(<notebook>:6)
    at line2bfff6d09387477794603ac98a6c816640.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq.apply(IMain.scala:645)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq.apply(IMain.scala:644)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl.apply$mcV$sp(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl.apply(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl.apply(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute.apply(DriverLocal.scala:396)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute.apply(DriverLocal.scala:373)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext.apply(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand.apply(DriverWrapper.scala:644)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand.apply(DriverWrapper.scala:644)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)

20/05/15 07:11:14 INFO S3AFileSystem: Caught an AmazonServiceException, which means your request made it to Amazon S3, but was rejected with an error response for some reason.
20/05/15 07:11:14 INFO S3AFileSystem: Error Message: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com table_data/_delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 15CAB9301B744BB1, Extended Request ID: W5BfMldhZBseAEeuu6pGrVg7j/BYwA6i0i9ulx8EeQolnesCbDUROysO00K+go2lWIAihBp65HQ=, Cloud Provider: AWS, Instance ID: i-04d201385f2c85deb (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 15CAB9301B744BB1; S3 Extended Request ID: ...
20/05/15 07:11:14 INFO S3AFileSystem: HTTP Status Code: 403
20/05/15 07:11:14 INFO S3AFileSystem: AWS Error Code: 403 Forbidden
20/05/15 07:11:14 INFO S3AFileSystem: Error Type: Client
20/05/15 07:11:14 INFO S3AFileSystem: Request ID: 15CAB9301B744BB1
20/05/15 07:11:14 INFO S3AFileSystem: Class Name: com.amazonaws.services.s3.model.AmazonS3Exception
20/05/15 07:11:14 INFO S3AFileSystem: Stack trace: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com table_data/_delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 15CAB9301B744BB1, Extended Request ID: W5BfMldhZBseAEeuu6pGrVg7j/BYwA6i0i9ulx8EeQolnesCbDUROysO00K+go2lWIAihBp65HQ=, Cloud Provider: AWS, Instance ID: i-04d201385f2c85deb (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 15CAB9301B744BB1; S3 Extended Request ID: W5BfMldhZBseAEeuu6pGrVg7j/BYwA6i0i9ulx8EeQolnesCbDUROysO00K+go2lWIAihBp65HQ=), S3 Extended Request ID: ...
  ...
20/05/15 07:11:14 INFO S3AFileSystem: Caught an AmazonServiceException, which means your request made it to Amazon S3, but was rejected with an error response for some reason.
20/05/15 07:11:14 INFO S3AFileSystem: Error Message: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com _delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; Request ID: 468EF53831074B5C, ...
20/05/15 07:11:14 INFO S3AFileSystem: HTTP Status Code: 403
20/05/15 07:11:14 INFO S3AFileSystem: AWS Error Code: 403 Forbidden
20/05/15 07:11:14 INFO S3AFileSystem: Error Type: Client
20/05/15 07:11:14 INFO S3AFileSystem: Request ID: 468EF53831074B5C
20/05/15 07:11:14 INFO S3AFileSystem: Class Name: com.amazonaws.services.s3.model.AmazonS3Exception
20/05/15 07:11:14 INFO S3AFileSystem: Stack trace: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden; request: HEAD https://exos-dev.s3.amazonaws.com _delta_log {} aws-sdk-java/1.11.595 Linux/4.4.0-1105-aws OpenJDK_64-Bit_Server_VM/25.242-b08 java/1.8.0_242 scala/2.11.12 vendor/Private_Build com.amazonaws.services.s3.model.GetObjectMetadataRequest; ...
    ...

20/05/15 07:11:14 INFO RecAvroFormat: -------- RecAvroFormat works ------------

20/05/15 07:11:14 INFO InMemoryFileIndex: Start listing leaf files and directories. Size of Paths: 1; threshold: 32

有什么想法吗?

我会在这里留下我的调查结果

DataFrameReader#load 方法总是调用 DataFrameReader#preprocessDeltaLoading,对于任何 reader 格式 - avro 的自定义或镶木地板。 preprocessDeltaLoading 的作用 - 它在 load 方法使用的路径中搜索增量根。

所以,它总是发送 3 个额外的请求。但是,当且仅当将单个路径传递给 load 方法时,它才会这样做。如果使用路径数组调用 load - 它不会尝试搜索增量根。

我做了什么 - 传递了额外的空 avro 文件。它确实创建了一个额外的任务并读取那个空文件,但至少我没有大量的禁止错误以及重试和退避。

需要请求 databricks 使此方法 preprocessDeltaLoading 可配置并选择禁用。