从 s3 异步读取多个 "bulked" json。有没有更好的办法?

Reading multiple "bulked" jsons from s3 asynchronously. Is there a better way?

目标是尝试从 s3 加载大量 "bulked" json。我找到了 aiobotocore 并感到被催促去尝试,希望获得更高的效率,同时熟悉 asyncio。我试了一下,它确实有效,但我对异步编程基本上一无所知。因此,我希望得到一些 improvements/comments。也许那里有一些好心人可以发现一些明显的错误。

问题是boto3一次只支持一个http请求。通过利用 Threadpool,我取得了显着的改进,但我希望有一种更有效的方法。

代码如下:

进口:

import os 
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE

我在某处找到的一些辅助生成器 return 从具有多个 json 的字符串中解码 json。

def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    '''helper for parsing individual jsons from string of jsons (stolen from somewhere)'''
    string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()

此函数从具有给定前缀的 s3 存储桶中获取密钥:

# Async stuff starts here
async def get_keys(loop, bucket, prefix):
    '''Get keys in bucket based on prefix'''

    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
        keys = []
        # list s3 objects using paginator
        paginator = client.get_paginator('list_objects')
        async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for c in result.get('Contents', []):
                keys.append(c['Key'])
        return keys

此函数获取所提供密钥的内容。最重要的是,它将解码内容列表展平:

async def get_object(loop,bucket, key):
    '''Get json content from s3 object'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:


        # get object from s3
        response = await client.get_object(Bucket=bucket, Key=key)
        async with response['Body'] as stream:
            content = await stream.read()    

    return list(iterload(content.decode()))       

这是主要功能,它收集所有找到的键的内容并展平内容列表。

async def go(loop, bucket, prefix):
    '''Returns list of dicts of object contents'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:

        keys = await get_keys(loop, bucket, prefix)

        contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])     

        return list(chain.from_iterable(contents))

最后,我 运行 这个和字典的结果列表在 result

中很好地结束了
loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))

提前致谢!希望这对处于类似情况的人有所帮助。

先看看aioboto3

其次,aiobotocore 中的每个客户端都与一个 aiohttp 会话相关联。每个会话最多可以有 max_pool_connections. This is why in the basic aiobotocore example 它在 create_client 上执行 async with。因此,当使用客户端完成时,池将关闭。

这里有一些提示:

  1. 您应该使用 work pool, created by me, modularized by CaliDog 以避免污染您的事件循环。使用此功能时,请将您的工作流程视为一个流。
  2. 这将避免您必须使用 asyncio.gather,这将在抛出第一个异常后将任务 运行 留在后台。
  3. 你应该一起调整你的工作循环大小和 max_pool_connections,并且只使用一个客户端来并行支持你想要(或者可以基于计算所需)的任务数量。
  4. 你真的不需要传递循环,因为现代 python 版本每个线程一个循环
  5. 您应该使用 aws 配置文件(Session init 的配置文件参数)/environment variables 这样您就不需要对密钥和区域信息进行硬编码。

基于以上,这里是我的做法:

import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial

# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config

_NUM_WORKERS = 50


def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    # helper for parsing individual jsons from string of jsons (stolen from somewhere)
    string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()


async def get_object(s3_client, bucket: str, key: str):
    # Get json content from s3 object

    # get object from s3
    response = await s3_client.get_object(Bucket=bucket, Key=key)
    async with response['Body'] as stream:
        content = await stream.read()

    return list(iterload(content.decode()))


async def go(bucket: str, prefix: str) -> List[dict]:
    """
    Returns list of dicts of object contents

    :param bucket: s3 bucket
    :param prefix: s3 bucket prefix
    :return: list of dicts of object contents
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()

    session = aiobotocore.session.AioSession()
    config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
    contents = []
    async with session.create_client('s3', config=config) as client:
        worker_co = partial(get_object, client, bucket)
        async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
                                       return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
            # list s3 objects using paginator
            paginator = client.get_paginator('list_objects')
            async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
                for c in result.get('Contents', []):
                    contents.append(await work_pool.push(c['Key']))

    # retrieve results from futures
    contents = [c.result() for c in contents]
    return list(chain.from_iterable(contents))


_loop = asyncio.get_event_loop()
_result = _loop.run_until_complete(go('some-bucket', 'some-prefix'))