如何将 S3 连接到本地的 pyspark(org.apache.hadoop.fs.UnsupportedFileSystemException:方案 "s3" 没有文件系统)

How to connect S3 to pyspark on local (org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3")

SPark 版本=3.2.1

Hadoop 版本=3.3.1

我关注了 Whosebug 上的所有帖子,但无法将其发送到 运行。我是 spark 的新手,正在尝试阅读 json 文件。

在我的本地 mac 上,我已经安装了 homebrew 和 pyspark。我刚刚下载了 jar,我需要保存在某个地方吗?

Jars downloaded are : hadoop-aws-3.3.1
                       aws-java-sdk-bundle-1.12.172

我将其保存在 /usr/local/Cellar/apache-spark/3.2.1/libexec/jars

 # /opt/python/latest/bin/pip3 list
import os

import pyspark

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
access_id = "A*"
access_key = "C*"

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import *

## set Spark properties
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1')

sc=SparkContext(conf=conf)
spark=SparkSession(sc)
spark._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", access_id)
spark._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
df=spark.read.json("s3://pt/raw/Deal_20220114.json")

df.show()

错误:

 org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"

我运行在当地怎么样?

spark-submit --packages org.apache.hadoop:hadoop-aws:3.3.1 test.py

错误:

 org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary(DataSource.scala:747)
        at scala.collection.immutable.List.map(List.scala:293)
        at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:745)
        at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:577)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:408)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:245)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
        at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:405)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)

22/03/06 19:31:36 INFO SparkContext: Invoking stop() from shutdown hook
22/03/06 19:31:36 INFO SparkUI: Stopped Spark web UI at http://adityakxc5zmd6m.attlocal.net:4041
22/03/06 19:31:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/03/06 19:31:36 INFO MemoryStore: MemoryStore cleared
22/03/06 19:31:36 INFO BlockManager: BlockManager stopped
22/03/06 19:31:36 INFO BlockManagerMaster: BlockManagerMaster stopped
22/03/06 19:31:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/03/06 19:31:37 INFO SparkContext: Successfully stopped SparkContext
22/03/06 19:31:37 INFO ShutdownHookManager: Shutdown hook called
22/03/06 19:31:37 INFO ShutdownHookManager: Deleting directory /private/var/folders/7q/r0xvmq6n4p55r6d8nx9gmd7c0000gr/T/spark-1e346a99-5d6f-498d-9bc4-ce8a3f951718
22/03/06 19:31:37 INFO ShutdownHookManager: Deleting directory /private/var/folders/7q/r0xvmq6n4p55r6d8nx9gmd7c0000gr/T/spark-f2727124-4f4b-4ee3-a8c0-607e207a3a98/pyspark-b96bf92a-84e8-409e-b9df-48f303c57b70
22/03/06 19:31:37 INFO ShutdownHookManager: Deleting directory /private/var/folders/7q/r0xvmq6n4p55r6d8nx9gmd7c0000gr/T/spark-f2727124-4f4b-4ee3-a8c0-607e207a3a98

No FileSystem for scheme "s3"

您尚未配置 fs.s3.impl,因此 Spark 不知道如何处理以 s3://

开头的文件路径

使用 fs.s3a.impl is recommended instead,并且您使用 s3a://

访问文件