通过 Spark(或更好:pyspark)在本地读取 S3 文件

Locally reading S3 files through Spark (or better: pyspark)

我想通过 Spark(实际上是 pyspark)从我的(本地)机器读取一个 S3 文件。现在,我不断收到

之类的身份验证错误

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

我在这里和网上到处查看,尝试了很多东西,但显然 S3 在过去一年或几个月里一直在变化,所有方法都失败了,但只有一个:

pyspark.SparkContext().textFile("s3n://user:password@bucket/key")

(注意 s3n [s3 无效])。现在,我不想对用户和密码使用 URL,因为它们会出现在日志中,而且我也不确定如何从 ~/.aws/credentials 文件中获取它们。

那么,我如何使用现在 standard ~/.aws/credentials 文件中的 AWS 凭据通过 Spark(或者更好的是 pyspark)从 S3 本地读取(理想情况下,无需将那里的凭据复制到另一个配置文件)?

PS:我试过os.environ["AWS_ACCESS_KEY_ID"] = …os.environ["AWS_SECRET_ACCESS_KEY"] = …,没用。

PPS:我不知道去哪里 "set the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties"(Google 什么也没想到)。但是,我确实尝试了多种设置方法:SparkContext.setSystemProperty()sc.setLocalProperty()conf = SparkConf(); conf.set(…); conf.set(…); sc = SparkContext(conf=conf)。什么都没用。

是的,您必须使用 s3n 而不是 s3s3 是对 S3 的一些奇怪滥用,我不清楚它的好处。

您可以将凭据传递给 sc.hadoopFilesc.newAPIHadoopFile 调用:

rdd = sc.hadoopFile('s3n://my_bucket/my_file', conf = {
  'fs.s3n.awsAccessKeyId': '...',
  'fs.s3n.awsSecretAccessKey': '...',
})

关于您必须提供给 hadoopFile 函数的 java 个对象,我不能说太多,只是这个函数似乎已经被一些 "newAPIHadoopFile" 弃用了。关于此的文档非常粗略,我觉得您需要了解 Scala/Java 才能真正深入了解一切的含义。 与此同时,我想出了如何将一些 s3 数据实际导入 pyspark,我想我会分享我的发现。 本文档:Spark API documentation says that it uses a dict that gets converted into a java configuration (XML). I found the configuration for java, this should probably reflect the values you should put into the dict:

bucket = "mycompany-mydata-bucket"
prefix = "2015/04/04/mybiglogfile.log.gz"
filename = "s3n://{}/{}".format(bucket, prefix)

config_dict = {"fs.s3n.awsAccessKeyId":"FOOBAR",
               "fs.s3n.awsSecretAccessKey":"BARFOO"}

rdd = sc.hadoopFile(filename,
                    'org.apache.hadoop.mapred.TextInputFormat',
                    'org.apache.hadoop.io.Text',
                    'org.apache.hadoop.io.LongWritable',
                    conf=config_dict)

此代码片段从前两行指定的存储桶和前缀(存储桶中的文件路径)加载文件。

问题实际上是亚马逊botoPython模块中的错误。问题与MacPort的版本实际上是旧的事实有关:通过pip安装boto解决了问题:~/.aws/credentials被正确读取。

现在我有了更多的经验,我会说一般来说(截至 2015 年底)Amazon Web Services 工具和 Spark/PySpark 有一个不完整的文档并且可能有一些很容易出现的严重错误到运行成。对于第一个问题,我建议首先更新 aws 命令行界面,boto 每次发生奇怪的事情时更新 Spark:这已经 "magically" 解决了一些问题我已经有问题了。

环境变量设置可能有所帮助。

Here in Spark FAQ 在问题 "How can I access data in S3?" 下,他们建议设置 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 环境变量。

这是一个关于如何从 ~/.aws/credentials 读取凭据的解决方案。它利用了凭证文件是一个 INI 文件这一事实,可以使用 Python 的 configparser 对其进行解析。

import os
import configparser

config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))

aws_profile = 'default' # your AWS profile to use

access_id = config.get(aws_profile, "aws_access_key_id") 
access_key = config.get(aws_profile, "aws_secret_access_key") 

另请参阅 https://gist.github.com/asmaier/5768c7cda3620901440a62248614bbd0 中的要点。