IllegalArgumentException,从 s3 而不是 hdfs 指定 input/output 时错误的 FS
IllegalArgumentException, Wrong FS when specifying input/output from s3 instead of hdfs
我一直在 运行 我在本地集群上的 Spark 作业,该集群具有从中读取输入和写入输出的 hdfs。现在我已经设置了一个 AWS EMR 和一个 S3 存储桶,我有我的输入,我希望我的输出也写入 S3。
错误:
User class threw exception: java.lang.IllegalArgumentException: Wrong
FS: s3://something/input, expected:
hdfs://ip-some-numbers.eu-west-1.compute.internal:8020
我尝试搜索相同的问题,但有几个关于此问题的问题。有人建议它只用于输出,但即使我禁用输出,我也会得到同样的错误。
另一个建议是我的代码中 FileSystem
有问题。以下是 input/output 在我的程序中出现的所有情况:
第一次出现在我的习惯 FileInputFormat
中,在 getSplits(JobContext job)
中,我实际上并没有修改自己,但我可以:
FileSystem fs = path.getFileSystem(job.getConfiguration());
我自定义的类似案例RecordReader
,我自己也没有修改:
final FileSystem fs = file.getFileSystem(job);
在我自己编写的自定义 RecordReader
的 nextKeyValue()
中,我使用:
FileSystem fs = FileSystem.get(jc);
最后,当我想检测我使用的文件夹中的文件数时:
val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))
我认为问题出在我的代码上,但我如何修改 FileSystem
调用以支持来自 S3 的 input/output?
hadoop 文件系统 api 不提供开箱即用的 S3 支持。 S3 的 hadoop 文件系统 api 有两种实现:S3A 和 S3N。 S3A 似乎是首选实现。要使用它,您必须做一些事情:
- 将 aws-java-sdk-bundle.jar 添加到您的类路径中。
创建文件系统时,在文件系统的配置中包含以下属性的值:
fs.s3a.access.key
fs.s3a.secret.key
- 在 S3 上指定路径时,不要使用
s3://
,而是使用 s3a://
。
注意: 创建一个简单的用户并先尝试使用基本身份验证。可以让它与 AWS 更高级的临时凭证机制一起工作,但这有点复杂,我必须对文件系统代码进行一些更改才能在我尝试时让它工作。
信息来源是here
这就是我在 EMR 上启动 spark-job 时解决这个问题的方法:
val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)
确保将 s3_bucket 替换为您的存储桶名称
希望对大家有所帮助
尝试为文件系统设置默认 URI:
FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s"s3a://$s3bucket"))
使用
指定密钥和机密后
fs.s3a.access.key
fs.s3a.secret.key
并按照说明获取文件系统:
val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)
我仍然会得到错误
java.lang.IllegalArgumentException: Wrong FS: s3a:// ... , expected: file:///
查看默认文件系统,可以查看上面创建的hdfs文件系统:
hadoopfs.getUri
对我来说仍然返回 file:///
为了让它正常工作,先于到运行FileSystem.get
,设置文件系统的默认URI。
val s3URI = s"s3a://$s3bucket"
FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s3URI))
val hdfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
EMR 配置为避免在代码或作业配置中使用密钥。
问题在于如何在您的示例中创建文件系统。
Hadoop 创建的默认文件系统是用于 hdfs 架构的文件系统。
因此,如果 path
模式是 s3://
,下一个代码将不起作用。
val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))
要创建正确的文件系统,您需要将路径与您将要使用的架构一起使用。例如,像这样:
val conf = sc.hadoopConfiguration
val pObj = new Path(path)
val status = pObj.getFileSystem(conf).listStatus(pObj)
来自 Hadoop 代码:
在FileSystem.get
中实施
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
使用Path.getFileSystem实现:
public FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(this.toUri(), conf);
}
我一直在 运行 我在本地集群上的 Spark 作业,该集群具有从中读取输入和写入输出的 hdfs。现在我已经设置了一个 AWS EMR 和一个 S3 存储桶,我有我的输入,我希望我的输出也写入 S3。
错误:
User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://something/input, expected: hdfs://ip-some-numbers.eu-west-1.compute.internal:8020
我尝试搜索相同的问题,但有几个关于此问题的问题。有人建议它只用于输出,但即使我禁用输出,我也会得到同样的错误。
另一个建议是我的代码中 FileSystem
有问题。以下是 input/output 在我的程序中出现的所有情况:
第一次出现在我的习惯 FileInputFormat
中,在 getSplits(JobContext job)
中,我实际上并没有修改自己,但我可以:
FileSystem fs = path.getFileSystem(job.getConfiguration());
我自定义的类似案例RecordReader
,我自己也没有修改:
final FileSystem fs = file.getFileSystem(job);
在我自己编写的自定义 RecordReader
的 nextKeyValue()
中,我使用:
FileSystem fs = FileSystem.get(jc);
最后,当我想检测我使用的文件夹中的文件数时:
val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))
我认为问题出在我的代码上,但我如何修改 FileSystem
调用以支持来自 S3 的 input/output?
hadoop 文件系统 api 不提供开箱即用的 S3 支持。 S3 的 hadoop 文件系统 api 有两种实现:S3A 和 S3N。 S3A 似乎是首选实现。要使用它,您必须做一些事情:
- 将 aws-java-sdk-bundle.jar 添加到您的类路径中。
创建文件系统时,在文件系统的配置中包含以下属性的值:
fs.s3a.access.key fs.s3a.secret.key
- 在 S3 上指定路径时,不要使用
s3://
,而是使用s3a://
。
注意: 创建一个简单的用户并先尝试使用基本身份验证。可以让它与 AWS 更高级的临时凭证机制一起工作,但这有点复杂,我必须对文件系统代码进行一些更改才能在我尝试时让它工作。
信息来源是here
这就是我在 EMR 上启动 spark-job 时解决这个问题的方法:
val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)
确保将 s3_bucket 替换为您的存储桶名称
希望对大家有所帮助
尝试为文件系统设置默认 URI:
FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s"s3a://$s3bucket"))
使用
指定密钥和机密后fs.s3a.access.key
fs.s3a.secret.key
并按照说明获取文件系统:
val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)
我仍然会得到错误
java.lang.IllegalArgumentException: Wrong FS: s3a:// ... , expected: file:///
查看默认文件系统,可以查看上面创建的hdfs文件系统:
hadoopfs.getUri
对我来说仍然返回 file:///
为了让它正常工作,先于到运行FileSystem.get
,设置文件系统的默认URI。
val s3URI = s"s3a://$s3bucket"
FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s3URI))
val hdfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
EMR 配置为避免在代码或作业配置中使用密钥。 问题在于如何在您的示例中创建文件系统。
Hadoop 创建的默认文件系统是用于 hdfs 架构的文件系统。
因此,如果 path
模式是 s3://
,下一个代码将不起作用。
val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))
要创建正确的文件系统,您需要将路径与您将要使用的架构一起使用。例如,像这样:
val conf = sc.hadoopConfiguration
val pObj = new Path(path)
val status = pObj.getFileSystem(conf).listStatus(pObj)
来自 Hadoop 代码:
在FileSystem.get
中实施 public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
使用Path.getFileSystem实现:
public FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(this.toUri(), conf);
}