Spark 使用 sc.textFile ("s3n://...) 从 S3 读取文件
Spark read file from S3 using sc.textFile ("s3n://...)
正在尝试使用 spark-shell 读取位于 S3 中的文件:
scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12
scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
... etc ...
IOException:方案没有文件系统:s3n 错误发生在:
- 开发机器上的 Spark 1.31 或 1.40(无 Hadoop 库)
- 运行 来自 Hortonworks Sandbox HDP v2.2.4 (Hadoop 2.60),它集成了开箱即用的 Spark 1.2.1
- 使用 s3:// 或 s3n:// 方案
这个错误的原因是什么?缺少依赖项、缺少配置或误用 sc.textFile()
?
或者这可能是由于一个影响特定于 Hadoop 2.60 的 Spark 构建的错误,正如这个 post 似乎暗示的那样。我将尝试使用 Spark for Hadoop 2.40 看看是否可以解决问题。
S3N 不是默认的文件格式。您需要使用具有用于 AWS 兼容性的附加库的 Hadoop 版本来构建您的 Spark 版本。我在这里找到的其他信息,https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce
确认这与针对 Hadoop 2.60 的 Spark 构建有关。刚刚安装 Spark 1.4.0 "Pre built for Hadoop 2.4 and later"(而不是 Hadoop 2.6)。现在代码可以正常工作了。
sc.textFile("s3n://bucketname/Filename")
现在引发另一个错误:
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
下面的代码使用了S3 URL格式来表示Spark可以读取S3文件。使用开发机器(无 Hadoop 库)。
scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt")
lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
scala> lyrics.count
res1: Long = 9
甚至更好:如果 AWS 密钥具有前向“/”,则上面在 S3N URI 中内联 AWS 凭据的代码将会中断。在 SparkContext 中配置 AWS Credentials 将修复它。无论 S3 文件是 public 还是私有文件,代码都有效。
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/"
val myRDD = sc.textFile("s3n://myBucket/MyFilePattern")
myRDD.count
这是一个示例 spark 代码,可以读取 s3 上存在的文件
val hadoopConf = sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", s3Key)
hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret)
var jobInput = sparkContext.textFile("s3://" + s3_location)
您可能必须使用 s3a:/ 方案而不是 s3:/ 或 s3n:/
但是,对于 spark shell,它并不是开箱即用的(对我来说)。我看到以下堆栈跟踪:
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC.<init>(<console>:39)
at $iwC.<init>(<console>:41)
at <init>(<console>:43)
at .<init>(<console>:47)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
... 68 more
我认为 - 您必须手动手动添加 hadoop-aws 依赖项 http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar 但我不知道如何将其正确添加到 spark-shell。
对于 Spark 1。4.x "Pre built for Hadoop 2.6 and later":
我刚刚从 hadoop-aws-2.6.0.jar 复制了需要的 S3、S3native 包到
spark-assembly-1.4.1-hadoop2.6.0.jar.
之后我重新启动了 spark cluster,它工作了。
不要忘记检查组装罐的所有者和模式。
您可以使用适当的 jar 添加 --packages 参数:
您的提交:
bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py
尽管这个问题已经有一个公认的答案,但我认为仍然缺少发生这种情况的确切细节。所以我认为可能还有一个地方可以回答。
如果您添加所需的 hadoop-aws 依赖项,您的代码应该可以工作。
从 Hadoop 2.6.0 开始,s3 FS 连接器已移至名为 hadoop-aws 的单独库。
还有一个 Jira:
Move s3-related FS connector code to hadoop-aws.
这意味着针对 Hadoop 2.6.0 或更高版本构建的任何版本的 spark 都必须使用另一个外部依赖项才能连接到 S3 文件系统。
这是我尝试过的一个 sbt 示例,它使用针对 Hadoop 2.6.0 构建的 Apache Spark 1.6.2 按预期工作:
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0"
就我而言,我遇到了一些依赖性问题,所以我通过添加排除来解决:
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" exclude("tomcat", "jasper-compiler") excludeAll ExclusionRule(organization = "javax.servlet")
关于其他相关说明,我还没有尝试过,但建议使用 "s3a" 而不是 "s3n" 启动 Hadoop 2.6.0 的文件系统。
The third generation, s3a: filesystem. Designed to be a switch in replacement for s3n:, this filesystem binding supports larger files and promises higher performance.
有一个 Spark JIRA,SPARK-7481,从今天(2016 年 10 月 20 日)开始,添加一个 spark-cloud 模块,其中包括对所有 s3a 和 azure wasb 的传递依赖:需要,以及测试.
和一个Spark PR相匹配。这就是我如何在我的 spark 构建中获得 s3a 支持
如果您手动执行此操作,则必须获得与您的其余 hadoop JARS 具有的确切版本的 hadoop-aws JAR,以及 100% 与编译 Hadoop aws 的内容同步的 AWS JAR 版本。对于 Hadoop 2.7。{1, 2, 3, ...}
hadoop-aws-2.7.x.jar
aws-java-sdk-1.7.4.jar
joda-time-2.9.3.jar
+ jackson-*-2.6.5.jar
将所有这些粘贴到 SPARK_HOME/jars
。 运行 使用您在 Env vars 或 spark-default.conf
中设置的凭据引发火花
最简单的测试是你能计算 CSV 文件的行数吗
val landsatCSV = "s3a://landsat-pds/scene_list.gz"
val lines = sc.textFile(landsatCSV)
val lineCount = lines.count()
取号:一切安好。获取堆栈跟踪。坏消息。
运行 在 Spark 2.0.2 中遇到同样的问题。通过给它喂罐子来解决它。这是我 运行:
$ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar
scala> val hadoopConf = sc.hadoopConfiguration
scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey)
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> sqlContext.read.parquet("s3://your-s3-bucket/")
显然,您需要将 jar 放在 运行 spark-shell 来自
的路径中
使用 s3a 而不是 s3n。我在 Hadoop 工作中遇到了类似的问题。从 s3n 切换到 s3a 后它起作用了。
例如
s3a://myBucket/myFile1.log
我遇到了同样的问题。在设置 fs.s3n.impl 的值并添加 hadoop-aws 依赖项后,它工作正常。
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
我必须将 jar 文件从 hadoop 下载复制到 $SPARK_HOME/jars
目录中。为 spark-submit 使用 --jars
标志或 --packages
标志无效。
详情:
- Spark 2.3.0
- 下载的 Hadoop 是 2.7.6
- 复制的两个 jar 文件来自
(hadoop dir)/share/hadoop/tools/lib/
- aws-java-sdk-1.7.4.jar
- hadoop-aws-2.7.6.jar
- 从 maven repository 下载与您的 hadoop 版本匹配的
hadoop-aws
jar。
- 将 jar 复制到
$SPARK_HOME/jars
位置。
现在在您的 Pyspark 脚本中,设置 AWS 访问密钥和秘密访问密钥。
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "ACCESS_KEY")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "YOUR_SECRET_ACCESSS_KEY")
// where spark is SparkSession instance
对于 Spark scala:
spark.sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "ACCESS_KEY")
spark.sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "YOUR_SECRET_ACCESSS_KEY")
正在尝试使用 spark-shell 读取位于 S3 中的文件:
scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12
scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
... etc ...
IOException:方案没有文件系统:s3n 错误发生在:
- 开发机器上的 Spark 1.31 或 1.40(无 Hadoop 库)
- 运行 来自 Hortonworks Sandbox HDP v2.2.4 (Hadoop 2.60),它集成了开箱即用的 Spark 1.2.1
- 使用 s3:// 或 s3n:// 方案
这个错误的原因是什么?缺少依赖项、缺少配置或误用 sc.textFile()
?
或者这可能是由于一个影响特定于 Hadoop 2.60 的 Spark 构建的错误,正如这个 post 似乎暗示的那样。我将尝试使用 Spark for Hadoop 2.40 看看是否可以解决问题。
S3N 不是默认的文件格式。您需要使用具有用于 AWS 兼容性的附加库的 Hadoop 版本来构建您的 Spark 版本。我在这里找到的其他信息,https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce
确认这与针对 Hadoop 2.60 的 Spark 构建有关。刚刚安装 Spark 1.4.0 "Pre built for Hadoop 2.4 and later"(而不是 Hadoop 2.6)。现在代码可以正常工作了。
sc.textFile("s3n://bucketname/Filename")
现在引发另一个错误:
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
下面的代码使用了S3 URL格式来表示Spark可以读取S3文件。使用开发机器(无 Hadoop 库)。
scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt")
lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
scala> lyrics.count
res1: Long = 9
甚至更好:如果 AWS 密钥具有前向“/”,则上面在 S3N URI 中内联 AWS 凭据的代码将会中断。在 SparkContext 中配置 AWS Credentials 将修复它。无论 S3 文件是 public 还是私有文件,代码都有效。
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/"
val myRDD = sc.textFile("s3n://myBucket/MyFilePattern")
myRDD.count
这是一个示例 spark 代码,可以读取 s3 上存在的文件
val hadoopConf = sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", s3Key)
hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret)
var jobInput = sparkContext.textFile("s3://" + s3_location)
您可能必须使用 s3a:/ 方案而不是 s3:/ 或 s3n:/ 但是,对于 spark shell,它并不是开箱即用的(对我来说)。我看到以下堆栈跟踪:
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC.<init>(<console>:39)
at $iwC.<init>(<console>:41)
at <init>(<console>:43)
at .<init>(<console>:47)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
... 68 more
我认为 - 您必须手动手动添加 hadoop-aws 依赖项 http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar 但我不知道如何将其正确添加到 spark-shell。
对于 Spark 1。4.x "Pre built for Hadoop 2.6 and later":
我刚刚从 hadoop-aws-2.6.0.jar 复制了需要的 S3、S3native 包到 spark-assembly-1.4.1-hadoop2.6.0.jar.
之后我重新启动了 spark cluster,它工作了。 不要忘记检查组装罐的所有者和模式。
您可以使用适当的 jar 添加 --packages 参数: 您的提交:
bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py
尽管这个问题已经有一个公认的答案,但我认为仍然缺少发生这种情况的确切细节。所以我认为可能还有一个地方可以回答。
如果您添加所需的 hadoop-aws 依赖项,您的代码应该可以工作。
从 Hadoop 2.6.0 开始,s3 FS 连接器已移至名为 hadoop-aws 的单独库。 还有一个 Jira: Move s3-related FS connector code to hadoop-aws.
这意味着针对 Hadoop 2.6.0 或更高版本构建的任何版本的 spark 都必须使用另一个外部依赖项才能连接到 S3 文件系统。
这是我尝试过的一个 sbt 示例,它使用针对 Hadoop 2.6.0 构建的 Apache Spark 1.6.2 按预期工作:
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0"
就我而言,我遇到了一些依赖性问题,所以我通过添加排除来解决:
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" exclude("tomcat", "jasper-compiler") excludeAll ExclusionRule(organization = "javax.servlet")
关于其他相关说明,我还没有尝试过,但建议使用 "s3a" 而不是 "s3n" 启动 Hadoop 2.6.0 的文件系统。
The third generation, s3a: filesystem. Designed to be a switch in replacement for s3n:, this filesystem binding supports larger files and promises higher performance.
有一个 Spark JIRA,SPARK-7481,从今天(2016 年 10 月 20 日)开始,添加一个 spark-cloud 模块,其中包括对所有 s3a 和 azure wasb 的传递依赖:需要,以及测试.
和一个Spark PR相匹配。这就是我如何在我的 spark 构建中获得 s3a 支持
如果您手动执行此操作,则必须获得与您的其余 hadoop JARS 具有的确切版本的 hadoop-aws JAR,以及 100% 与编译 Hadoop aws 的内容同步的 AWS JAR 版本。对于 Hadoop 2.7。{1, 2, 3, ...}
hadoop-aws-2.7.x.jar
aws-java-sdk-1.7.4.jar
joda-time-2.9.3.jar
+ jackson-*-2.6.5.jar
将所有这些粘贴到 SPARK_HOME/jars
。 运行 使用您在 Env vars 或 spark-default.conf
最简单的测试是你能计算 CSV 文件的行数吗
val landsatCSV = "s3a://landsat-pds/scene_list.gz"
val lines = sc.textFile(landsatCSV)
val lineCount = lines.count()
取号:一切安好。获取堆栈跟踪。坏消息。
运行 在 Spark 2.0.2 中遇到同样的问题。通过给它喂罐子来解决它。这是我 运行:
$ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar
scala> val hadoopConf = sc.hadoopConfiguration
scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey)
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> sqlContext.read.parquet("s3://your-s3-bucket/")
显然,您需要将 jar 放在 运行 spark-shell 来自
的路径中使用 s3a 而不是 s3n。我在 Hadoop 工作中遇到了类似的问题。从 s3n 切换到 s3a 后它起作用了。
例如
s3a://myBucket/myFile1.log
我遇到了同样的问题。在设置 fs.s3n.impl 的值并添加 hadoop-aws 依赖项后,它工作正常。
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
我必须将 jar 文件从 hadoop 下载复制到 $SPARK_HOME/jars
目录中。为 spark-submit 使用 --jars
标志或 --packages
标志无效。
详情:
- Spark 2.3.0
- 下载的 Hadoop 是 2.7.6
- 复制的两个 jar 文件来自
(hadoop dir)/share/hadoop/tools/lib/
- aws-java-sdk-1.7.4.jar
- hadoop-aws-2.7.6.jar
- 从 maven repository 下载与您的 hadoop 版本匹配的
hadoop-aws
jar。 - 将 jar 复制到
$SPARK_HOME/jars
位置。
现在在您的 Pyspark 脚本中,设置 AWS 访问密钥和秘密访问密钥。
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "ACCESS_KEY")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "YOUR_SECRET_ACCESSS_KEY")
// where spark is SparkSession instance
对于 Spark scala:
spark.sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "ACCESS_KEY")
spark.sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "YOUR_SECRET_ACCESSS_KEY")