动态读取更改文件名键

Dynamically read changing filename key

我有 parquet 个文件是通过 spark 生成的,s3 中的文件名(密钥)将始终更改 post ETL 作业。这是我用来通过 sagemaker 中的 boto3 读取 parquet 文件的代码。寻找一种动态读取 S3 文件名(密钥)的方法,因为对密钥进行硬编码将使读取失败,因为它每次都会更改。如何实现?谢谢。

filename = "datasets/randomnumbergenerator.parquet"
bucketName = "bucket-name"

buffer = io.BytesIO()
client = boto3.resource("s3")
obj = client.Object(bucketName, filename)
obj.download_fileobj(buffer)
df = pd.read_parquet(buffer)
import fnmatch

filename = "datasets/randomnumbergenerator.parquet"
bucketName = "bucket-name"

buffer = io.BytesIO()
client = boto3.resource("s3")
bucket = client.Bucket(bucketName)
for bucket_object in bucket.objects.all():
    if fnmatch.fnmatch(bucket_object.key, 'datasets/*.parquet'):
        print(bucket_object.key)

您需要从存储桶中获取对象列表,然后您可以遍历这些对象以找到您要查找的对象。

更新: 您可以在循环中使用 fnmatch 来限制您对文件的了解(假设当前代码是在名为 "datasets" 的文件夹中并且类型为 parquet)。这将 return 存储桶中匹配的所有对象。

这个解决方案对我有用。

import boto3
import pandas as pd
import io
import pyarrow
import fastparquet

def dynamically_read_filename_key(bucket, prefix='', suffix=''):
    s3 = boto3\
    .client("s3",\
            region_name=os.environ['AWS_DEFAULT_REGION'],\
            aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],\
            aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'])
    kwargs = {'Bucket': bucket}
    if isinstance(prefix, str):
        kwargs['Prefix'] = prefix
    resp = s3\
    .list_objects_v2(**kwargs)
    for obj in resp['Contents']:
        key = obj['Key']
    if key.startswith(prefix) and key.endswith(suffix):
        return key

filename = "".join(i for i in dynamically_read_filename_key\
                   (bucket="my-bucket",\
                    prefix="datasets/",\
                    suffix=".parquet"))

bucket = "my-bucket"

def parquet_read_filename_key(bucket, filename):
    client = boto3\
    .resource("s3",\
            region_name=os.environ['AWS_DEFAULT_REGION'],\
            aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],\
            aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'])
    buffer = io.BytesIO()
    obj = client.Object(bucket, filename)
    obj.download_fileobj(buffer)
    df = pd.read_parquet(buffer)
    return df

df = parquet_read_filename_key(bucket, filename)