IllegalArgumentException,从 s3 而不是 hdfs 指定 input/output 时错误的 FS

IllegalArgumentException, Wrong FS when specifying input/output from s3 instead of hdfs

我一直在 运行 我在本地集群上的 Spark 作业,该集群具有从中读取输入和写入输出的 hdfs。现在我已经设置了一个 AWS EMR 和一个 S3 存储桶,我有我的输入,我希望我的输出也写入 S3。

错误:

User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://something/input, expected: hdfs://ip-some-numbers.eu-west-1.compute.internal:8020

我尝试搜索相同的问题,但有几个关于此问题的问题。有人建议它只用于输出,但即使我禁用输出,我也会得到同样的错误。

另一个建议是我的代码中 FileSystem 有问题。以下是 input/output 在我的程序中出现的所有情况:

第一次出现在我的习惯 FileInputFormat 中,在 getSplits(JobContext job) 中,我实际上并没有修改自己,但我可以:

FileSystem fs = path.getFileSystem(job.getConfiguration());

我自定义的类似案例RecordReader,我自己也没有修改:

final FileSystem fs = file.getFileSystem(job);

在我自己编写的自定义 RecordReadernextKeyValue() 中,我使用:

FileSystem fs = FileSystem.get(jc);

最后,当我想检测我使用的文件夹中的文件数时:

val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))

我认为问题出在我的代码上,但我如何修改 FileSystem 调用以支持来自 S3 的 input/output?

hadoop 文件系统 api 不提供开箱即用的 S3 支持。 S3 的 hadoop 文件系统 api 有两种实现:S3A 和 S3N。 S3A 似乎是首选实现。要使用它,您必须做一些事情:

  1. 将 aws-java-sdk-bundle.jar 添加到您的类路径中。
  2. 创建文件系统时,在文件系统的配置中包含以下属性的值:

    fs.s3a.access.key
    fs.s3a.secret.key
    
  3. 在 S3 上指定路径时,不要使用 s3://,而是使用 s3a://

注意: 创建一个简单的用户并先尝试使用基本身份验证。可以让它与 AWS 更高级的临时凭证机制一起工作,但这有点复杂,我必须对文件系统代码进行一些更改才能在我尝试时让它工作。

信息来源是here

这就是我在 EMR 上启动 spark-job 时解决这个问题的方法:

 val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)

确保将 s3_bucket 替换为您的存储桶名称

希望对大家有所帮助

尝试为文件系统设置默认 URI:

FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s"s3a://$s3bucket"))

使用

指定密钥和机密后
fs.s3a.access.key
fs.s3a.secret.key

并按照说明获取文件系统:

val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)

我仍然会得到错误

java.lang.IllegalArgumentException: Wrong FS: s3a:// ... , expected: file:///

查看默认文件系统,可以查看上面创建的hdfs文件系统: hadoopfs.getUri 对我来说仍然返回 file:///

为了让它正常工作,先于到运行FileSystem.get,设置文件系统的默认URI。

val s3URI = s"s3a://$s3bucket"
FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s3URI))

val hdfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

EMR 配置为避免在代码或作业配置中使用密钥。 问题在于如何在您的示例中创建文件系统。

Hadoop 创建的默认文件系统是用于 hdfs 架构的文件系统。

因此,如果 path 模式是 s3://,下一个代码将不起作用。

val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))

要创建正确的文件系统,您需要将路径与您将要使用的架构一起使用。例如,像这样:

val conf = sc.hadoopConfiguration
val pObj = new Path(path)
val status = pObj.getFileSystem(conf).listStatus(pObj)

来自 Hadoop 代码:

在FileSystem.get

中实施
   public static FileSystem get(Configuration conf) throws IOException {
      return get(getDefaultUri(conf), conf);
   }

使用Path.getFileSystem实现:

   public FileSystem getFileSystem(Configuration conf) throws IOException {
      return FileSystem.get(this.toUri(), conf);
   }