如何使用 aioboto3 快速异步地从 amazon S3 获取底层子文件夹

How to get ONLY bottom level sub-folders from amazon S3 with aioboto3 fast and asynchronously

我问过类似的问题还有其他人也有类似的问题,但这个更具体。我可以使用 boto3 客户端(或异步代码的 aioboto3)从 S3 获取任意深度的所有子文件夹,但它非常慢,它让我返回所有对象,然后我使用如下代码过滤这些对象:

    subfolders = set()
    prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes]
    try:

        for prefix_future in asyncio.as_completed(prefix_tasks):
            prefix_subfolders = await prefix_future
            subfolders.update(prefix_subfolders)

    except KeyError as exc:
        print(f"Scanning origin bucket failed due to: {exc}")
        raise exc

我的 get_subfolders 函数是:

async def get_subfolders(self, bucket: str, prefix: str) -> Set[str]:

    subfolders = set()

    result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    objects = result.get("Contents")
    subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))

    # Use next continuation token for pagination for truncated results.
    while result["IsTruncated"]:
        result = await self.s3_client.list_objects_v2(
            Bucket=bucket,
            Prefix=prefix,
            ContinuationToken=result["NextContinuationToken"],
        )
        objects = result.get("Contents")
        subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))

    return subfolders

我的 get_paths_by_depth() 函数是:

    async def get_paths_by_depth(self, objects: dict, depth: int) -> Set[str]:
    subfolders = set()
    current_path = None
    try:
        # Get only paths with depth equal to 'depth' levels
        for bucket_object in objects:
            current_path = os.path.dirname(bucket_object["Key"])
            if current_path.count("/") == depth:
                subfolders.add(current_path)

    except Exception as exc:
        print(f"Getting subfolders failed due to error: {exc}")
        raise exc

    return subfolders

有什么方法可以加快速度吗?我真的想避免带回所有文件然后过滤掉路径。我可以马上要求特定长度的路径吗?

所以我的文件结构是这样的:

prefix/subfolder1/subfolder2/subfolder3/file1.txt
prefix/subfolder1/subfolder2/subfolder3/file2.json
prefix/subfolder4/subfolder5/file3.json
prefix/subfolder6/subfolder7/subfolder8/

并且我只想获取以至少一个文件结尾的路径,在上述情况下我希望最后拥有:

prefix/subfolder1/subfolder2/subfolder3/
prefix/subfolder4/subfolder5/

到目前为止,对于我在问题中发布的代码,我正在查看存储桶中的每个文件并将其路径保存在一个集合中。那行得通,但花了太长时间。

一种更快的方法是在 S3 请求中使用 Delimiter 参数。具体来说,我使用了“。”更改 s3_client 响应的定界符,它包括存储桶中包含“.”的所有 CommonPrefixes。由于所有文件都包含一个“.”我通过单个请求获得所有通用前缀,而不是检查每个文件。新代码是这样的:

async def get_subfolders(
    self, bucket: str, prefix: str, delimiter: str = "."
) -> Set[str]:

    subfolders = set()
    foldername = None
    try:
        paginator = self.s3_client.get_paginator("list_objects")
        async for result in paginator.paginate(
            Bucket=bucket, Prefix=prefix, Delimiter=delimiter
        ):
            for obj in result.get("CommonPrefixes", []):
                foldername = os.path.dirname(obj["Prefix"])
                # Get only paths with depth greater or equal than S3_FOLDERS_PATH_DEPTH
                if foldername.count("/") >= S3_FOLDERS_PATH_DEPTH:
                    subfolders.add(foldername)

    except Exception as exc:
        print(f"Getting subfolders failed due to error: {exc}")
        raise exc

    return subfolders