如何使用 aioboto3 快速异步地从 amazon S3 获取底层子文件夹
How to get ONLY bottom level sub-folders from amazon S3 with aioboto3 fast and asynchronously
我问过类似的问题还有其他人也有类似的问题,但这个更具体。我可以使用 boto3 客户端(或异步代码的 aioboto3)从 S3 获取任意深度的所有子文件夹,但它非常慢,它让我返回所有对象,然后我使用如下代码过滤这些对象:
subfolders = set()
prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes]
try:
for prefix_future in asyncio.as_completed(prefix_tasks):
prefix_subfolders = await prefix_future
subfolders.update(prefix_subfolders)
except KeyError as exc:
print(f"Scanning origin bucket failed due to: {exc}")
raise exc
我的 get_subfolders
函数是:
async def get_subfolders(self, bucket: str, prefix: str) -> Set[str]:
subfolders = set()
result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
# Use next continuation token for pagination for truncated results.
while result["IsTruncated"]:
result = await self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
ContinuationToken=result["NextContinuationToken"],
)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
return subfolders
我的 get_paths_by_depth()
函数是:
async def get_paths_by_depth(self, objects: dict, depth: int) -> Set[str]:
subfolders = set()
current_path = None
try:
# Get only paths with depth equal to 'depth' levels
for bucket_object in objects:
current_path = os.path.dirname(bucket_object["Key"])
if current_path.count("/") == depth:
subfolders.add(current_path)
except Exception as exc:
print(f"Getting subfolders failed due to error: {exc}")
raise exc
return subfolders
有什么方法可以加快速度吗?我真的想避免带回所有文件然后过滤掉路径。我可以马上要求特定长度的路径吗?
所以我的文件结构是这样的:
prefix/subfolder1/subfolder2/subfolder3/file1.txt
prefix/subfolder1/subfolder2/subfolder3/file2.json
prefix/subfolder4/subfolder5/file3.json
prefix/subfolder6/subfolder7/subfolder8/
并且我只想获取以至少一个文件结尾的路径,在上述情况下我希望最后拥有:
prefix/subfolder1/subfolder2/subfolder3/
prefix/subfolder4/subfolder5/
到目前为止,对于我在问题中发布的代码,我正在查看存储桶中的每个文件并将其路径保存在一个集合中。那行得通,但花了太长时间。
一种更快的方法是在 S3 请求中使用 Delimiter
参数。具体来说,我使用了“。”更改 s3_client 响应的定界符,它包括存储桶中包含“.”的所有 CommonPrefixes。由于所有文件都包含一个“.”我通过单个请求获得所有通用前缀,而不是检查每个文件。新代码是这样的:
async def get_subfolders(
self, bucket: str, prefix: str, delimiter: str = "."
) -> Set[str]:
subfolders = set()
foldername = None
try:
paginator = self.s3_client.get_paginator("list_objects")
async for result in paginator.paginate(
Bucket=bucket, Prefix=prefix, Delimiter=delimiter
):
for obj in result.get("CommonPrefixes", []):
foldername = os.path.dirname(obj["Prefix"])
# Get only paths with depth greater or equal than S3_FOLDERS_PATH_DEPTH
if foldername.count("/") >= S3_FOLDERS_PATH_DEPTH:
subfolders.add(foldername)
except Exception as exc:
print(f"Getting subfolders failed due to error: {exc}")
raise exc
return subfolders
我问过类似的问题
subfolders = set()
prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes]
try:
for prefix_future in asyncio.as_completed(prefix_tasks):
prefix_subfolders = await prefix_future
subfolders.update(prefix_subfolders)
except KeyError as exc:
print(f"Scanning origin bucket failed due to: {exc}")
raise exc
我的 get_subfolders
函数是:
async def get_subfolders(self, bucket: str, prefix: str) -> Set[str]:
subfolders = set()
result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
# Use next continuation token for pagination for truncated results.
while result["IsTruncated"]:
result = await self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
ContinuationToken=result["NextContinuationToken"],
)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
return subfolders
我的 get_paths_by_depth()
函数是:
async def get_paths_by_depth(self, objects: dict, depth: int) -> Set[str]:
subfolders = set()
current_path = None
try:
# Get only paths with depth equal to 'depth' levels
for bucket_object in objects:
current_path = os.path.dirname(bucket_object["Key"])
if current_path.count("/") == depth:
subfolders.add(current_path)
except Exception as exc:
print(f"Getting subfolders failed due to error: {exc}")
raise exc
return subfolders
有什么方法可以加快速度吗?我真的想避免带回所有文件然后过滤掉路径。我可以马上要求特定长度的路径吗?
所以我的文件结构是这样的:
prefix/subfolder1/subfolder2/subfolder3/file1.txt
prefix/subfolder1/subfolder2/subfolder3/file2.json
prefix/subfolder4/subfolder5/file3.json
prefix/subfolder6/subfolder7/subfolder8/
并且我只想获取以至少一个文件结尾的路径,在上述情况下我希望最后拥有:
prefix/subfolder1/subfolder2/subfolder3/
prefix/subfolder4/subfolder5/
到目前为止,对于我在问题中发布的代码,我正在查看存储桶中的每个文件并将其路径保存在一个集合中。那行得通,但花了太长时间。
一种更快的方法是在 S3 请求中使用 Delimiter
参数。具体来说,我使用了“。”更改 s3_client 响应的定界符,它包括存储桶中包含“.”的所有 CommonPrefixes。由于所有文件都包含一个“.”我通过单个请求获得所有通用前缀,而不是检查每个文件。新代码是这样的:
async def get_subfolders(
self, bucket: str, prefix: str, delimiter: str = "."
) -> Set[str]:
subfolders = set()
foldername = None
try:
paginator = self.s3_client.get_paginator("list_objects")
async for result in paginator.paginate(
Bucket=bucket, Prefix=prefix, Delimiter=delimiter
):
for obj in result.get("CommonPrefixes", []):
foldername = os.path.dirname(obj["Prefix"])
# Get only paths with depth greater or equal than S3_FOLDERS_PATH_DEPTH
if foldername.count("/") >= S3_FOLDERS_PATH_DEPTH:
subfolders.add(foldername)
except Exception as exc:
print(f"Getting subfolders failed due to error: {exc}")
raise exc
return subfolders