使用 Python aiohttp 时将多个 API 请求合并为一个

Combining multiple API requests into one when using Python aiohttp

我有一个 API 端点,它可以接受一个或多个对象 ID 和 return 对它们的响应,例如 http://example.com/api/metadata?id=1&id=2&id=3。 API 端点的速率受限于每次调用而不是每个 ID,因此最好使用多个 ID 调用 API 端点。

另一方面,我有尝试获取每个 ID 元数据的现有代码,例如:

async def get_metadata(object_id):
    response = await session.get(f"http://example.com/api/metadata?id={object_id}")
    response.raise_for_status()
    return (await response.json())['results'][object_id]

我想保持此函数的签名不变,但对其进行更改,使其不执行单独的请求,而是阻塞直到 a) 50 个 ID 已准备好获取 b) 内部发生 10 秒之类的超时其中一些但不是 50 个 ID 已准备好获取。然后发出一个 API 请求,然后每次(阻塞)调用 get_metadata returns 相应的结果。所以 get_metadata 的外部行为应该保持不变。

我尝试了一些使用信号量或队列的方法,但我卡住了。那么实现这个的好方法是什么?

信号量在这里不起作用,因为它们的工作方式与您的需要相反:它们不会阻塞,直到一定数量的协程获取它们。您需要 barrier 的 asyncio 等价物,不幸的是,标准库中不存在它。

幸运的是,使用事件和列表来实现屏障并不难。你可以这样做(只是模糊测试):

_waiters = []
_have_new_waiter = None

async def get_metadata(session, object_id):
    global _have_new_waiter
    if _have_new_waiter is None:
        _have_new_waiter = asyncio.Event()
        asyncio.create_task(_monitor_incoming(session))

    future = asyncio.get_event_loop().create_future()
    _waiters.append((object_id, future))
    _have_new_waiter.set()
    return await future

async def _monitor_incoming(session):
    while True:
        timeout = False
        try:
            await asyncio.wait_for(_have_new_waiter.wait(), 10)
        except asyncio.TimeoutError:
            timeout = True
        _have_new_waiter.clear()
        if len(_waiters) == 0 or len(_waiters) < 50 and not timeout:
            continue
        lst = _waiters[:]
        del _waiters[:]
        asyncio.create_task(_get_batch(session, lst))

async def _get_batch(session, waiter_lst):
    object_ids = [object_id for (object_id, _future) in waiter_lst]
    try:
        async with session.get(
                f"http://example.com/api/metadata?ids={'&'.join(map(str, object_ids))}"):
            response.raise_for_status()
            dct = response.json()['results']
    except Exception as e:
        for result, (_object_id, future) in zip(results, waiter_lst):
            future.set_exception(e)
        return
    results = [results[object_id] for object_id in object_ids]
    for result, (_object_id, future) in zip(results, waiter_lst):
        future.set_result(result)

经过一段时间的修修补补,我想到了这个:

import abc, asyncio

class Batcher(metaclass=abc.ABCMeta):
    def __init__(self, *, max_batch, timeout):
        """
        Constructs a new Batcher. The parameter max_batch specifies
        the queue capacity, while timeout is the deadline after which
        a queue will be processed whether it’s at capacity or not.
        """
        self.__batch = None
        self.__event = None
        self.__timeout = timeout
        self.__maxsize = max_batch

    async def __wait(self, event, batch):
        try:
            await asyncio.wait_for(event.wait(), timeout=self.__timeout)
        except asyncio.TimeoutError:
            self.__event = None
        await self.__run(self.__batch)

    async def __run(self, batch):
        self.__batch = None
        try:
            await self._process(batch)
        except Exception as e:
            for _, future in batch:
                if future.done():
                    continue
                future.set_exception(e)
        else:
            for _, future in batch:
                if future.done():
                    continue
                future.set_result(None)

    def _setup(self):
        """
        Initialises a new batch.
        """
        if self.__event is not None:
            return
        self.__batch = []
        self.__event = asyncio.Event()
        asyncio.create_task(self.__wait(self.__event, self.__batch))

    def _finish(self):
        """
        Marks the current batch as complete and starts processing it.
        """
        self.__batch = None
        self.__event.set()
        self.__event = None 

    def _enqueue(self, item):
        """
        Adds an item to be processed in the next batch.

        Returns: an awaitable that will return the result of processing
        when awaited.
        """
        self._setup()
        future = asyncio.Future()
        self.__batch.append((item, future))
        if len(self.__batch) >= self.__maxsize:
            self._finish()
        return future

    @abc.abstractmethod
    async def _process(self, batch):
        """
        Processes the current batch. The batch parameter contains a list
        of pairs (item, future), where item is the value passed to _enqueue,
        while future is an asyncio.Future. Call the .set_result and/or
        .set_exception methods on the latter to return a result to the
        caller; if you don’t assign a result yourself, the returned value
        will be None.
        """
        raise NotImplementedError

您可以子class Batcher 围绕 _enqueue 创建一个外观,它将验证参数并准备它们以供处理。

示例:

import urllib

def singleton(*args, **kwargs):
    def wrapper(cls):
        return cls(*args, **kwargs)
    return wrapper

@singleton(max_batch=50, timeout=10)
class get_metadata(Batcher):
    async def _process(self, batch):
        qs = "&".join(
            f"id={urllib.parse.quote(str(object_id))}"
            for object_id, _ in batch
        )

        response = await session.get("http://example.com/api/metadata?" + qs)
        response.raise_for_status()

        results = (await response.json())['results']
        for object_id, future in batch:
            try:
                future.set_result(results[object_id])
            except Exception as e:
                future.set_exception(e)

    async def __call__(self, object_id):
        if not isinstance(object_id, int):
            raise ValueError(object_id)
        return await self._enqueue(object_id)

此处,get_metadata 是一个 class 实例,但由于 __call__ 特殊方法,您可以像调用普通函数一样调用它。