使用来自 s3 的火花加载 json 时错误的 FS

Wrong FS on loading json with spark from s3

我正在尝试使用 spark 和 magellan library 加载 geojson 文件 我的加载代码是:

val polygons = spark.read.format("magellan").option("type", "geojson").load(inJson)

inJson 是我在 s3 上的 json 的路径: s3n://bucket-name/geojsons/file.json

堆栈跟踪错误:

0.3 in stage 0.0 (TID 3, ip-172-31-19-102.eu-west-1.compute.internal, executor 1): java.lang.IllegalArgumentException: Wrong FS: s3n://bucket-name/geojsons/file.json, expected: hdfs://ip-172-31-27-182.eu-west-1.compute.internal:8020 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:653) at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) at org.apache.hadoop.hdfs.DistributedFileSystem.access[=12=]0(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:304) at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:299) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:773) at magellan.mapreduce.WholeFileReader.nextKeyValue(WholeFileReader.scala:45) at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:199) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212) at scala.collection.AbstractIterator.fold(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$fold$$anonfun.apply(RDD.scala:1086) at org.apache.spark.rdd.RDD$$anonfun$fold$$anonfun.apply(RDD.scala:1086) at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1980) at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1980) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

只有当我 运行 它在不止一台机器上时才会出现问题,所以它在核心组中有 master 和 1 个实例的 EMR 集群上工作正常,但在核心组中有 10 个实例时失败

如果您 运行 使用 EMR,您可以只使用 "s3://bucket/path" 而不是 "s3n://...."

这原来是麦哲伦内部的问题WholeFileReader。它正在获取默认文件系统。

this pull request

解决了

解决方案是这样的:

-      val fs = FileSystem.get(conf)
+      val fs = path.getFileSystem(conf)