Amazon s3a returns 400 错误请求与 Spark-redshift 库
Amazon s3a returns 400 Bad Request with Spark-redshift library
我在通过 spark-redshift library 加载 Redshift 数据时遇到 java.io.IOException: s3n://bucket-name : 400 : Bad Request error
:
Redshift 集群和 s3 bucket 都在孟买地区。
这是完整的错误堆栈:
2017-01-13 13:14:22 WARN TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, master): java.io.IOException: s3n://bucket-name : 400 : Bad Request
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:453)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:476)
at com.databricks.spark.redshift.RedshiftRecordReader.initialize(RedshiftInputFormat.scala:115)
at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader.apply(RedshiftFileFormat.scala:92)
at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader.apply(RedshiftFileFormat.scala:80)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon.apply(fileSourceInterfaces.scala:279)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon.apply(fileSourceInterfaces.scala:263)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:116)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:91)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.jets3t.service.impl.rest.HttpException: 400 Bad Request
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
... 30 more
这是我的 java 代码:
SparkContext sparkContext = SparkSession.builder().appName("CreditModeling").getOrCreate().sparkContext();
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sparkContext.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", fs_s3a_awsAccessKeyId);
sparkContext.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", fs_s3a_awsSecretAccessKey);
sparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com");
SQLContext sqlContext=new SQLContext(sparkContext);
Dataset dataset= sqlContext
.read()
.format("com.databricks.spark.redshift")
.option("url", redshiftUrl)
.option("query", query)
.option("aws_iam_role", aws_iam_role)
.option("tempdir", "s3a://bucket-name/temp-dir")
.load();
我能够通过以下更改(参考 this)在 spark 本地模式 上解决问题 :
1) 我已将 jets3t jar 替换为 0.9.4
2) 更改了 jets3t 配置属性以支持 aws4 版本存储桶,如下所示:
Jets3tProperties myProperties = Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
myProperties.setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");
但现在我正在尝试 运行 作业 在集群模式下 (spark 独立模式 或 使用资源管理器MESOS),错误再次出现:(
如有任何帮助,我们将不胜感激!
该堆栈意味着您使用的是基于 jets3t 的较旧的 s3n 连接器。您设置的权限仅适用于较新的 S3a。使用像 s3a:// 这样的 URL 来获取新条目。
鉴于您正在尝试使用 V4 API,您还需要设置 fs。s3a.endpoint。 400/bad-request 响应是您尝试使用 v4 针对中央端点进行身份验证时会看到的响应
实际问题:
更新 Jets3tProperties,以支持 AWS s3 签名版本 4,在 运行时间在本地模式下工作,但在集群模式下不工作,因为属性只在驱动程序 JVM 上更新,而不在任何执行程序上更新JVM 的。
解决方案:
我通过参考 this link.
找到了更新所有执行程序上的 Jets3tProperties 的解决方法
通过参考上面的 link 我在 .foreachPartition() 函数中添加了一个额外的代码片段,用于更新 Jets3tProperties,它将 运行 它用于在任何一个上创建的第一个分区执行者。
代码如下:
Dataset dataset= sqlContext
.read()
.format("com.databricks.spark.redshift")
.option("url", redshiftUrl)
.option("query", query)
.option("aws_iam_role", aws_iam_role)
.option("tempdir", "s3a://bucket-name/temp-dir")
.load();
dataset.foreachPartition(rdd -> {
boolean first=true;
if(first){
Jets3tProperties myProperties =
Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
myProperties
.setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");
first = false;
}
});
我在通过 spark-redshift library 加载 Redshift 数据时遇到 java.io.IOException: s3n://bucket-name : 400 : Bad Request error
:
Redshift 集群和 s3 bucket 都在孟买地区。
这是完整的错误堆栈:
2017-01-13 13:14:22 WARN TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, master): java.io.IOException: s3n://bucket-name : 400 : Bad Request
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:453)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:476)
at com.databricks.spark.redshift.RedshiftRecordReader.initialize(RedshiftInputFormat.scala:115)
at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader.apply(RedshiftFileFormat.scala:92)
at com.databricks.spark.redshift.RedshiftFileFormat$$anonfun$buildReader.apply(RedshiftFileFormat.scala:80)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon.apply(fileSourceInterfaces.scala:279)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon.apply(fileSourceInterfaces.scala:263)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:116)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:91)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.jets3t.service.impl.rest.HttpException: 400 Bad Request
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
... 30 more
这是我的 java 代码:
SparkContext sparkContext = SparkSession.builder().appName("CreditModeling").getOrCreate().sparkContext();
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sparkContext.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", fs_s3a_awsAccessKeyId);
sparkContext.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", fs_s3a_awsSecretAccessKey);
sparkContext.hadoopConfiguration().set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com");
SQLContext sqlContext=new SQLContext(sparkContext);
Dataset dataset= sqlContext
.read()
.format("com.databricks.spark.redshift")
.option("url", redshiftUrl)
.option("query", query)
.option("aws_iam_role", aws_iam_role)
.option("tempdir", "s3a://bucket-name/temp-dir")
.load();
我能够通过以下更改(参考 this)在 spark 本地模式 上解决问题 :
1) 我已将 jets3t jar 替换为 0.9.4
2) 更改了 jets3t 配置属性以支持 aws4 版本存储桶,如下所示:
Jets3tProperties myProperties = Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
myProperties.setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");
但现在我正在尝试 运行 作业 在集群模式下 (spark 独立模式 或 使用资源管理器MESOS),错误再次出现:(
如有任何帮助,我们将不胜感激!
该堆栈意味着您使用的是基于 jets3t 的较旧的 s3n 连接器。您设置的权限仅适用于较新的 S3a。使用像 s3a:// 这样的 URL 来获取新条目。
鉴于您正在尝试使用 V4 API,您还需要设置 fs。s3a.endpoint。 400/bad-request 响应是您尝试使用 v4 针对中央端点进行身份验证时会看到的响应
实际问题:
更新 Jets3tProperties,以支持 AWS s3 签名版本 4,在 运行时间在本地模式下工作,但在集群模式下不工作,因为属性只在驱动程序 JVM 上更新,而不在任何执行程序上更新JVM 的。
解决方案:
我通过参考 this link.
找到了更新所有执行程序上的 Jets3tProperties 的解决方法通过参考上面的 link 我在 .foreachPartition() 函数中添加了一个额外的代码片段,用于更新 Jets3tProperties,它将 运行 它用于在任何一个上创建的第一个分区执行者。
代码如下:
Dataset dataset= sqlContext
.read()
.format("com.databricks.spark.redshift")
.option("url", redshiftUrl)
.option("query", query)
.option("aws_iam_role", aws_iam_role)
.option("tempdir", "s3a://bucket-name/temp-dir")
.load();
dataset.foreachPartition(rdd -> {
boolean first=true;
if(first){
Jets3tProperties myProperties =
Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME);
myProperties.setProperty("s3service.s3-endpoint", "s3.ap-south-1.amazonaws.com");
myProperties
.setProperty("storage-service.request-signature-version", "AWS4-HMAC-SHA256");
myProperties.setProperty("uploads.stream-retry-buffer-size", "2147483646");
first = false;
}
});