如何使用 aioboto3 在 S3 存储桶中同时 list_objects
How to concurrently list_objects in S3 bucket with aioboto3
我想找到 S3 存储桶中的所有唯一路径,我想要文件级别之前的所有路径。目录的深度可能会有所不同,因此并非所有文件都在相同的深度中找到,例如我可能有这些文件:
data/subdir1/subdir2/file.csv
data/subdir1/subdir3/subdir4/subdir5/file2.csv
data/subdir6/subdir7/subdir8/file3.csv
我想要这些目录:
data/subdir1/subdir2/
data/subdir1/subdir3/subdir4/subdir5/
data/subdir6/subdir7/subdir8/
我正在使用下面的代码来获取它们。我使用带分页器的 async for
循环,因为我虽然它们会被同时处理,但我不确定它们是。好像很慢 所以我想他们还是串联完成的:
subfolders = set()
current_path = None
paginator = self.s3_client.get_paginator("list_objects")
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for file in result.get("Contents", []):
current_path = os.path.dirname(file.get("Key"))
if current_path not in subfolders:
subfolders.add(current_path)
print(f"Part Done")
return subfolders
我的 s3_client 是 aioboto3
客户。
有没有办法加快查找和保存目录的过程?
注意: 我意识到这个方法并没有给我带来所有结果,只有来自当前分页器的结果我可以异步获取下一个分页器吗?
我没有找到对返回的对象进行并行化的方法,但我使用许多像这样的初始前缀进行了并行化:
subfolders = set()
prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes]
try:
for prefix_future in asyncio.as_completed(prefix_tasks):
prefix_subfolders = await prefix_future
subfolders.update(prefix_subfolders)
except KeyError as exc:
print(f"Scanning origin bucket failed due to: {exc}")
raise exc
我的 get_subfolders
函数是:
async def get_subfolders(self, bucket: str, prefix: str) -> List[str]:
subfolders = set()
result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
# Use next continuation token for pagination for truncated results.
while result["IsTruncated"]:
result = await self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
ContinuationToken=result["NextContinuationToken"],
)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
return subfolders
我的 get_paths_by_depth()
函数是:
async def get_paths_by_depth(self, objects: dict, depth: int) -> Set[str]:
subfolders = set()
current_path = None
try:
# Get only paths with depth equal to 'depth' levels
for bucket_object in objects:
current_path = os.path.dirname(bucket_object["Key"])
if current_path.count("/") == depth:
subfolders.add(current_path)
except Exception as exc:
print(f"Getting subfolders failed due to error: {exc}")
raise exc
return subfolders
我想找到 S3 存储桶中的所有唯一路径,我想要文件级别之前的所有路径。目录的深度可能会有所不同,因此并非所有文件都在相同的深度中找到,例如我可能有这些文件:
data/subdir1/subdir2/file.csv
data/subdir1/subdir3/subdir4/subdir5/file2.csv
data/subdir6/subdir7/subdir8/file3.csv
我想要这些目录:
data/subdir1/subdir2/
data/subdir1/subdir3/subdir4/subdir5/
data/subdir6/subdir7/subdir8/
我正在使用下面的代码来获取它们。我使用带分页器的 async for
循环,因为我虽然它们会被同时处理,但我不确定它们是。好像很慢 所以我想他们还是串联完成的:
subfolders = set()
current_path = None
paginator = self.s3_client.get_paginator("list_objects")
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for file in result.get("Contents", []):
current_path = os.path.dirname(file.get("Key"))
if current_path not in subfolders:
subfolders.add(current_path)
print(f"Part Done")
return subfolders
我的 s3_client 是 aioboto3
客户。
有没有办法加快查找和保存目录的过程?
注意: 我意识到这个方法并没有给我带来所有结果,只有来自当前分页器的结果我可以异步获取下一个分页器吗?
我没有找到对返回的对象进行并行化的方法,但我使用许多像这样的初始前缀进行了并行化:
subfolders = set()
prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes]
try:
for prefix_future in asyncio.as_completed(prefix_tasks):
prefix_subfolders = await prefix_future
subfolders.update(prefix_subfolders)
except KeyError as exc:
print(f"Scanning origin bucket failed due to: {exc}")
raise exc
我的 get_subfolders
函数是:
async def get_subfolders(self, bucket: str, prefix: str) -> List[str]:
subfolders = set()
result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
# Use next continuation token for pagination for truncated results.
while result["IsTruncated"]:
result = await self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
ContinuationToken=result["NextContinuationToken"],
)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
return subfolders
我的 get_paths_by_depth()
函数是:
async def get_paths_by_depth(self, objects: dict, depth: int) -> Set[str]:
subfolders = set()
current_path = None
try:
# Get only paths with depth equal to 'depth' levels
for bucket_object in objects:
current_path = os.path.dirname(bucket_object["Key"])
if current_path.count("/") == depth:
subfolders.add(current_path)
except Exception as exc:
print(f"Getting subfolders failed due to error: {exc}")
raise exc
return subfolders