Amazon s3a returns 400 错误请求与 Spark-redshift 库

Amazon s3a returns 400 Bad Request with Spark-redshift library

我在通过 spark-redshift library 加载 Redshift 数据时遇到 java.io.IOException: s3n://bucket-name : 400 : Bad Request error:

Redshift 集群和 s3 bucket 都在孟买地区

这是完整的错误堆栈:

2017-01-13 13:14:22 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, master): java.io.IOException: s3n://bucket-name : 400 : Bad Request
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:453)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
            at org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
            at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:476)
            at com.databricks.spark.redshift.RedshiftRecordReader.initialize(RedshiftInputFormat.scala:115)
            at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader.apply(RedshiftFileFormat.scala:92)
            at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader.apply(RedshiftFileFormat.scala:80)
            at org.apache.spark.sql.execution.datasources.FileFormat$$anon.apply(fileSourceInterfaces.scala:279)
            at org.apache.spark.sql.execution.datasources.FileFormat$$anon.apply(fileSourceInterfaces.scala:263)
            at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:116)
            at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:91)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370)
            at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
            at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
            at org.apache.spark.scheduler.Task.run(Task.scala:86)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.jets3t.service.impl.rest.HttpException: 400 Bad Request
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264)
            at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193)
            at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
            at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
            at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
            ... 30 more

这是我的 java 代码:

SparkContext sparkContext = SparkSession.builder().appName("CreditModeling").getOrCreate().sparkContext();
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sparkContext.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", fs_s3a_awsAccessKeyId);
sparkContext.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", fs_s3a_awsSecretAccessKey);
sparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com");

SQLContext sqlContext=new SQLContext(sparkContext);
Dataset dataset= sqlContext
        .read()
        .format("com.databricks.spark.redshift")
        .option("url", redshiftUrl)
        .option("query", query)
        .option("aws_iam_role", aws_iam_role)
        .option("tempdir", "s3a://bucket-name/temp-dir")
        .load();

我能够通过以下更改(参考 this)在 spark 本地模式 上解决问题

1) 我已将 jets3t jar 替换为 0.9.4

2) 更改了 jets3t 配置属性以支持 aws4 版本存储桶,如下所示:

Jets3tProperties myProperties = Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
myProperties.setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");

但现在我正在尝试 运行 作业 在集群模式下 spark 独立模式 使用资源管理器MESOS),错误再次出现:(

如有任何帮助,我们将不胜感激!

该堆栈意味着您使用的是基于 jets3t 的较旧的 s3n 连接器。您设置的权限仅适用于较新的 S3a。使用像 s3a:// 这样的 URL 来获取新条目。

鉴于您正在尝试使用 V4 API,您还需要设置 fs。s3a.endpoint。 400/bad-request 响应是您尝试使用 v4 针对中央端点进行身份验证时会看到的响应

实际问题:

更新 Jets3tProperties,以支持 AWS s3 签名版本 4,在 运行时间在本地模式下工作,但在集群模式下不工作,因为属性只在驱动程序 JVM 上更新,而不在任何执行程序上更新JVM 的。

解决方案:

我通过参考 this link.

找到了更新所有执行程序上的 Jets3tProperties 的解决方法

通过参考上面的 link 我在 .foreachPartition() 函数中添加了一个额外的代码片段,用于更新 Jets3tProperties,它将 运行 它用于在任何一个上创建的第一个分区执行者。

代码如下:

 Dataset dataset= sqlContext
            .read()
            .format("com.databricks.spark.redshift")
            .option("url", redshiftUrl)
            .option("query", query)
            .option("aws_iam_role", aws_iam_role)
            .option("tempdir", "s3a://bucket-name/temp-dir")
            .load();

dataset.foreachPartition(rdd -> {
    boolean first=true;
    if(first){
        Jets3tProperties myProperties =
                Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
        myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
        myProperties
                .setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
        myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");
        first = false;
    }
});