从 s3 异步读取多个 "bulked" json。有没有更好的办法?
Reading multiple "bulked" jsons from s3 asynchronously. Is there a better way?
目标是尝试从 s3 加载大量 "bulked" json。我找到了 aiobotocore
并感到被催促去尝试,希望获得更高的效率,同时熟悉 asyncio
。我试了一下,它确实有效,但我对异步编程基本上一无所知。因此,我希望得到一些 improvements/comments。也许那里有一些好心人可以发现一些明显的错误。
问题是boto3一次只支持一个http请求。通过利用 Threadpool
,我取得了显着的改进,但我希望有一种更有效的方法。
代码如下:
进口:
import os
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE
我在某处找到的一些辅助生成器 return 从具有多个 json 的字符串中解码 json。
def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
'''helper for parsing individual jsons from string of jsons (stolen from somewhere)'''
string = str(string_or_fp)
decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()
此函数从具有给定前缀的 s3 存储桶中获取密钥:
# Async stuff starts here
async def get_keys(loop, bucket, prefix):
'''Get keys in bucket based on prefix'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = []
# list s3 objects using paginator
paginator = client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get('Contents', []):
keys.append(c['Key'])
return keys
此函数获取所提供密钥的内容。最重要的是,它将解码内容列表展平:
async def get_object(loop,bucket, key):
'''Get json content from s3 object'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
# get object from s3
response = await client.get_object(Bucket=bucket, Key=key)
async with response['Body'] as stream:
content = await stream.read()
return list(iterload(content.decode()))
这是主要功能,它收集所有找到的键的内容并展平内容列表。
async def go(loop, bucket, prefix):
'''Returns list of dicts of object contents'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = await get_keys(loop, bucket, prefix)
contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])
return list(chain.from_iterable(contents))
最后,我 运行 这个和字典的结果列表在 result
中很好地结束了
loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))
我认为可能有点奇怪的一件事是我在每个异步函数中创建了一个客户端。大概是可以解除的吧。请注意 aiobotocore
如何与多个客户端一起工作。
此外,我认为您不需要在加载键的对象之前等待所有键都已加载,我认为在此实现中就是这种情况。我假设一旦找到密钥,您就可以调用 get_object
。所以,也许它应该是 async generator
。但我在这里并不完全清楚。
提前致谢!希望这对处于类似情况的人有所帮助。
先看看aioboto3
其次,aiobotocore 中的每个客户端都与一个 aiohttp 会话相关联。每个会话最多可以有 max_pool_connections. This is why in the basic aiobotocore example 它在 create_client
上执行 async with
。因此,当使用客户端完成时,池将关闭。
这里有一些提示:
- 您应该使用 work pool, created by me, modularized by CaliDog 以避免污染您的事件循环。使用此功能时,请将您的工作流程视为一个流。
- 这将避免您必须使用 asyncio.gather,这将在抛出第一个异常后将任务 运行 留在后台。
- 你应该一起调整你的工作循环大小和 max_pool_connections,并且只使用一个客户端来并行支持你想要(或者可以基于计算所需)的任务数量。
- 你真的不需要传递循环,因为现代 python 版本每个线程一个循环
- 您应该使用 aws 配置文件(Session init 的配置文件参数)/environment variables 这样您就不需要对密钥和区域信息进行硬编码。
基于以上,这里是我的做法:
import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial
# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config
_NUM_WORKERS = 50
def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
# helper for parsing individual jsons from string of jsons (stolen from somewhere)
string = str(string_or_fp)
decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()
async def get_object(s3_client, bucket: str, key: str):
# Get json content from s3 object
# get object from s3
response = await s3_client.get_object(Bucket=bucket, Key=key)
async with response['Body'] as stream:
content = await stream.read()
return list(iterload(content.decode()))
async def go(bucket: str, prefix: str) -> List[dict]:
"""
Returns list of dicts of object contents
:param bucket: s3 bucket
:param prefix: s3 bucket prefix
:return: list of dicts of object contents
"""
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
session = aiobotocore.session.AioSession()
config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
contents = []
async with session.create_client('s3', config=config) as client:
worker_co = partial(get_object, client, bucket)
async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
# list s3 objects using paginator
paginator = client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get('Contents', []):
contents.append(await work_pool.push(c['Key']))
# retrieve results from futures
contents = [c.result() for c in contents]
return list(chain.from_iterable(contents))
_loop = asyncio.get_event_loop()
_result = _loop.run_until_complete(go('some-bucket', 'some-prefix'))
目标是尝试从 s3 加载大量 "bulked" json。我找到了 aiobotocore
并感到被催促去尝试,希望获得更高的效率,同时熟悉 asyncio
。我试了一下,它确实有效,但我对异步编程基本上一无所知。因此,我希望得到一些 improvements/comments。也许那里有一些好心人可以发现一些明显的错误。
问题是boto3一次只支持一个http请求。通过利用 Threadpool
,我取得了显着的改进,但我希望有一种更有效的方法。
代码如下:
进口:
import os
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE
我在某处找到的一些辅助生成器 return 从具有多个 json 的字符串中解码 json。
def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
'''helper for parsing individual jsons from string of jsons (stolen from somewhere)'''
string = str(string_or_fp)
decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()
此函数从具有给定前缀的 s3 存储桶中获取密钥:
# Async stuff starts here
async def get_keys(loop, bucket, prefix):
'''Get keys in bucket based on prefix'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = []
# list s3 objects using paginator
paginator = client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get('Contents', []):
keys.append(c['Key'])
return keys
此函数获取所提供密钥的内容。最重要的是,它将解码内容列表展平:
async def get_object(loop,bucket, key):
'''Get json content from s3 object'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
# get object from s3
response = await client.get_object(Bucket=bucket, Key=key)
async with response['Body'] as stream:
content = await stream.read()
return list(iterload(content.decode()))
这是主要功能,它收集所有找到的键的内容并展平内容列表。
async def go(loop, bucket, prefix):
'''Returns list of dicts of object contents'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = await get_keys(loop, bucket, prefix)
contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])
return list(chain.from_iterable(contents))
最后,我 运行 这个和字典的结果列表在 result
loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))
我认为可能有点奇怪的一件事是我在每个异步函数中创建了一个客户端。大概是可以解除的吧。请注意
aiobotocore
如何与多个客户端一起工作。此外,我认为您不需要在加载键的对象之前等待所有键都已加载,我认为在此实现中就是这种情况。我假设一旦找到密钥,您就可以调用
get_object
。所以,也许它应该是async generator
。但我在这里并不完全清楚。
提前致谢!希望这对处于类似情况的人有所帮助。
先看看aioboto3
其次,aiobotocore 中的每个客户端都与一个 aiohttp 会话相关联。每个会话最多可以有 max_pool_connections. This is why in the basic aiobotocore example 它在 create_client
上执行 async with
。因此,当使用客户端完成时,池将关闭。
这里有一些提示:
- 您应该使用 work pool, created by me, modularized by CaliDog 以避免污染您的事件循环。使用此功能时,请将您的工作流程视为一个流。
- 这将避免您必须使用 asyncio.gather,这将在抛出第一个异常后将任务 运行 留在后台。
- 你应该一起调整你的工作循环大小和 max_pool_connections,并且只使用一个客户端来并行支持你想要(或者可以基于计算所需)的任务数量。
- 你真的不需要传递循环,因为现代 python 版本每个线程一个循环
- 您应该使用 aws 配置文件(Session init 的配置文件参数)/environment variables 这样您就不需要对密钥和区域信息进行硬编码。
基于以上,这里是我的做法:
import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial
# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config
_NUM_WORKERS = 50
def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
# helper for parsing individual jsons from string of jsons (stolen from somewhere)
string = str(string_or_fp)
decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()
async def get_object(s3_client, bucket: str, key: str):
# Get json content from s3 object
# get object from s3
response = await s3_client.get_object(Bucket=bucket, Key=key)
async with response['Body'] as stream:
content = await stream.read()
return list(iterload(content.decode()))
async def go(bucket: str, prefix: str) -> List[dict]:
"""
Returns list of dicts of object contents
:param bucket: s3 bucket
:param prefix: s3 bucket prefix
:return: list of dicts of object contents
"""
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
session = aiobotocore.session.AioSession()
config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
contents = []
async with session.create_client('s3', config=config) as client:
worker_co = partial(get_object, client, bucket)
async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
# list s3 objects using paginator
paginator = client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get('Contents', []):
contents.append(await work_pool.push(c['Key']))
# retrieve results from futures
contents = [c.result() for c in contents]
return list(chain.from_iterable(contents))
_loop = asyncio.get_event_loop()
_result = _loop.run_until_complete(go('some-bucket', 'some-prefix'))