将 delta lake 写入 AWS S3(没有 Databricks)

Writing delta lake to AWS S3 (Without Databricks)

# Creating PySpark Object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("XMLParser").getOrCreate()
sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", aws_key)
hadoop_conf.set("fs.s3n.awsSecretAccessKey", aws_secret)

然后我可以使用以下代码从我的 s3 存储桶中读取文件

df = spark.read.format("xml").options(rootTag='returnResult', rowTag="query").load("s3n://bucketName/folder/file.xml")

但是当我尝试使用此代码使用 delta lake(parquet 文件)写回 s3 时

df.write.format("delta").mode('overwrite').save("s3n://bucket/folder/file")

我遇到了这个错误

    Py4JJavaError: An error occurred while calling o778.save.
: java.io.IOException: The error typically occurs when the default LogStore implementation, that
 is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
 In order to get the transactional ACID guarantees on table updates, you have to use the
 correct implementation of LogStore that is appropriate for your storage system.
 See https://docs.delta.io/latest/delta-storage.html " for details.

    at org.apache.spark.sql.delta.DeltaErrors$.incorrectLogStoreImplementationException(DeltaErrors.scala:157)
    at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:73)
    at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit.apply$mcJ$sp(OptimisticTransaction.scala:434)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit.apply(OptimisticTransaction.scala:416)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit.apply(OptimisticTransaction.scala:416)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:152)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.doCommit(OptimisticTransaction.scala:415)
    at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit.apply$mcJ$sp(OptimisticTransaction.scala:326)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit.apply(OptimisticTransaction.scala:284)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit.apply(OptimisticTransaction.scala:284)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:77)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.commit(OptimisticTransaction.scala:284)
    at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run.apply(WriteIntoDelta.scala:67)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run.apply(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:134)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3n.impl=null: No AbstractFileSystem configured for scheme: s3n
    at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
    at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
    at org.apache.hadoop.fs.FileContext.run(FileContext.java:334)
    at org.apache.hadoop.fs.FileContext.run(FileContext.java:331)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
    at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
    at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
    at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
    ... 53 more

我尝试按照堆栈跟踪中给出的 link 进行操作,但无法弄清楚如何解决这个问题。任何帮助都会得到帮助

创建 spark 会话后,您需要添加 databricks 提供的配置以启用 s3 作为增量存储,例如:

conf = spark.sparkContext._conf.setAll([('spark.delta.logStore.class','org.apache.spark.sql.delta.storage.S3SingleDriverLogStore')])
spark.sparkContext._conf.getAll()

As the name suggests, the S3SingleDriverLogStore implementation only works properly when all concurrent writes originate from a single Spark driver. This is an application property, must be set before starting SparkContext, and cannot change during the lifetime of the context.

来自数据块 访问 here 配置 s3a 路径访问密钥和秘密密钥。

我发现不需要对 S3 位置做任何特殊的事情。我只需要将增量格式文件写到 S3 中尚不存在的文件夹中。如果它已经存在并且其中有对象,我会得到与 OP 相同的错误。

这是我的 spark 会话创建代码:

spark = glueContext.spark_session.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

我使用 s3a。

df.write.format("delta").mode('overwrite').save("s3a://bucket/folder/file")