如何将 EMR Pyspark 生成的 Amazon S3 输出文件检索回 Flask

How to retrieve the output file from Amazon S3 generated from EMR Pyspark back into Flask

我目前正在尝试使用 pyspark 将我的 Flask 应用程序连接到 Amazon EMR。我将 AWS (https://docs.aws.amazon.com/code-samples/latest/catalog/code-catalog-python-example_code-emr.html) 中的示例用于 pyspark。我使用以下代码输出文件:

df.write.mode('overwrite').csv('s3://my-bucket/output')

Amazon EMR 的输出文件存储在 Amazon S3 中,名称如下:

  1. part-00003-2e96c921-8459-4dc9-93e7-3c71eccd442f-c000.csv
  2. part-00007-2e96c921-8459-4dc9-93e7-3c71eccd442f-c000.csv
  3. part-00011-2e96c921-8459-4dc9-93e7-3c71eccd442f-c000.csv

我想将 CSV 文件读入我的 Flask 应用程序。由于文件名每次都不同,我该如何读取这些文件?有什么更聪明的方法吗?

我假设您正在尝试将它们读入一个数据帧。 (另外,根据您的评论,'part' 前缀将很常见)

s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')

prefix_objs = bucket.objects.filter(Prefix="output/part")

prefix_df = []

for obj in prefix_objs:
    try:
        key = obj.key
        body = obj.get()['Body'].read()
        temp = pd.read_csv(io.BytesIO(body),header=None, encoding='utf8',sep=',')        
        prefix_df.append(temp)
    except:
        continue

这将读取存储桶中 output 文件夹中前缀为 'part' 的所有文件并添加到数组中。

之后,您可以将其连接为

pd.concat(prefix_df)