如何使用 aioboto3 在 S3 存储桶中同时 list_objects

How to concurrently list_objects in S3 bucket with aioboto3

我想找到 S3 存储桶中的所有唯一路径,我想要文件级别之前的所有路径。目录的深度可能会有所不同,因此并非所有文件都在相同的深度中找到,例如我可能有这些文件:

data/subdir1/subdir2/file.csv
data/subdir1/subdir3/subdir4/subdir5/file2.csv
data/subdir6/subdir7/subdir8/file3.csv

我想要这些目录:

data/subdir1/subdir2/
data/subdir1/subdir3/subdir4/subdir5/
data/subdir6/subdir7/subdir8/

我正在使用下面的代码来获取它们。我使用带分页器的 async for 循环,因为我虽然它们会被同时处理,但我不确定它们是。好像很慢 所以我想他们还是串联完成的:

    subfolders = set()
    current_path = None

    paginator = self.s3_client.get_paginator("list_objects")

    async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for file in result.get("Contents", []):
            current_path = os.path.dirname(file.get("Key"))
            if current_path not in subfolders:
                subfolders.add(current_path)
            print(f"Part Done")

    return subfolders

我的 s3_client 是 aioboto3 客户。

有没有办法加快查找和保存目录的过程?

注意: 我意识到这个方法并没有给我带来所有结果,只有来自当前分页器的结果我可以异步获取下一个分页器吗?

我没有找到对返回的对象进行并行化的方法,但我使用许多像这样的初始前缀进行了并行化:

    subfolders = set()
    prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes]
    try:

        for prefix_future in asyncio.as_completed(prefix_tasks):
            prefix_subfolders = await prefix_future
            subfolders.update(prefix_subfolders)

    except KeyError as exc:
        print(f"Scanning origin bucket failed due to: {exc}")
        raise exc

我的 get_subfolders 函数是:

async def get_subfolders(self, bucket: str, prefix: str) -> List[str]:

    subfolders = set()

    result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    objects = result.get("Contents")
    subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))

    # Use next continuation token for pagination for truncated results.
    while result["IsTruncated"]:
        result = await self.s3_client.list_objects_v2(
            Bucket=bucket,
            Prefix=prefix,
            ContinuationToken=result["NextContinuationToken"],
        )
        objects = result.get("Contents")
        subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))

    return subfolders

我的 get_paths_by_depth() 函数是:

    async def get_paths_by_depth(self, objects: dict, depth: int) -> Set[str]:
    subfolders = set()
    current_path = None
    try:
        # Get only paths with depth equal to 'depth' levels
        for bucket_object in objects:
            current_path = os.path.dirname(bucket_object["Key"])
            if current_path.count("/") == depth:
                subfolders.add(current_path)

    except Exception as exc:
        print(f"Getting subfolders failed due to error: {exc}")
        raise exc

    return subfolders