使用 Python aiohttp 时将多个 API 请求合并为一个
Combining multiple API requests into one when using Python aiohttp
我有一个 API 端点,它可以接受一个或多个对象 ID 和 return 对它们的响应,例如 http://example.com/api/metadata?id=1&id=2&id=3
。 API 端点的速率受限于每次调用而不是每个 ID,因此最好使用多个 ID 调用 API 端点。
另一方面,我有尝试获取每个 ID 元数据的现有代码,例如:
async def get_metadata(object_id):
response = await session.get(f"http://example.com/api/metadata?id={object_id}")
response.raise_for_status()
return (await response.json())['results'][object_id]
我想保持此函数的签名不变,但对其进行更改,使其不执行单独的请求,而是阻塞直到 a) 50 个 ID 已准备好获取 b) 内部发生 10 秒之类的超时其中一些但不是 50 个 ID 已准备好获取。然后发出一个 API 请求,然后每次(阻塞)调用 get_metadata
returns 相应的结果。所以 get_metadata
的外部行为应该保持不变。
我尝试了一些使用信号量或队列的方法,但我卡住了。那么实现这个的好方法是什么?
信号量在这里不起作用,因为它们的工作方式与您的需要相反:它们不会阻塞,直到一定数量的协程获取它们。您需要 barrier 的 asyncio 等价物,不幸的是,标准库中不存在它。
幸运的是,使用事件和列表来实现屏障并不难。你可以这样做(只是模糊测试):
_waiters = []
_have_new_waiter = None
async def get_metadata(session, object_id):
global _have_new_waiter
if _have_new_waiter is None:
_have_new_waiter = asyncio.Event()
asyncio.create_task(_monitor_incoming(session))
future = asyncio.get_event_loop().create_future()
_waiters.append((object_id, future))
_have_new_waiter.set()
return await future
async def _monitor_incoming(session):
while True:
timeout = False
try:
await asyncio.wait_for(_have_new_waiter.wait(), 10)
except asyncio.TimeoutError:
timeout = True
_have_new_waiter.clear()
if len(_waiters) == 0 or len(_waiters) < 50 and not timeout:
continue
lst = _waiters[:]
del _waiters[:]
asyncio.create_task(_get_batch(session, lst))
async def _get_batch(session, waiter_lst):
object_ids = [object_id for (object_id, _future) in waiter_lst]
try:
async with session.get(
f"http://example.com/api/metadata?ids={'&'.join(map(str, object_ids))}"):
response.raise_for_status()
dct = response.json()['results']
except Exception as e:
for result, (_object_id, future) in zip(results, waiter_lst):
future.set_exception(e)
return
results = [results[object_id] for object_id in object_ids]
for result, (_object_id, future) in zip(results, waiter_lst):
future.set_result(result)
经过一段时间的修修补补,我想到了这个:
import abc, asyncio
class Batcher(metaclass=abc.ABCMeta):
def __init__(self, *, max_batch, timeout):
"""
Constructs a new Batcher. The parameter max_batch specifies
the queue capacity, while timeout is the deadline after which
a queue will be processed whether it’s at capacity or not.
"""
self.__batch = None
self.__event = None
self.__timeout = timeout
self.__maxsize = max_batch
async def __wait(self, event, batch):
try:
await asyncio.wait_for(event.wait(), timeout=self.__timeout)
except asyncio.TimeoutError:
self.__event = None
await self.__run(self.__batch)
async def __run(self, batch):
self.__batch = None
try:
await self._process(batch)
except Exception as e:
for _, future in batch:
if future.done():
continue
future.set_exception(e)
else:
for _, future in batch:
if future.done():
continue
future.set_result(None)
def _setup(self):
"""
Initialises a new batch.
"""
if self.__event is not None:
return
self.__batch = []
self.__event = asyncio.Event()
asyncio.create_task(self.__wait(self.__event, self.__batch))
def _finish(self):
"""
Marks the current batch as complete and starts processing it.
"""
self.__batch = None
self.__event.set()
self.__event = None
def _enqueue(self, item):
"""
Adds an item to be processed in the next batch.
Returns: an awaitable that will return the result of processing
when awaited.
"""
self._setup()
future = asyncio.Future()
self.__batch.append((item, future))
if len(self.__batch) >= self.__maxsize:
self._finish()
return future
@abc.abstractmethod
async def _process(self, batch):
"""
Processes the current batch. The batch parameter contains a list
of pairs (item, future), where item is the value passed to _enqueue,
while future is an asyncio.Future. Call the .set_result and/or
.set_exception methods on the latter to return a result to the
caller; if you don’t assign a result yourself, the returned value
will be None.
"""
raise NotImplementedError
您可以子class Batcher
围绕 _enqueue
创建一个外观,它将验证参数并准备它们以供处理。
示例:
import urllib
def singleton(*args, **kwargs):
def wrapper(cls):
return cls(*args, **kwargs)
return wrapper
@singleton(max_batch=50, timeout=10)
class get_metadata(Batcher):
async def _process(self, batch):
qs = "&".join(
f"id={urllib.parse.quote(str(object_id))}"
for object_id, _ in batch
)
response = await session.get("http://example.com/api/metadata?" + qs)
response.raise_for_status()
results = (await response.json())['results']
for object_id, future in batch:
try:
future.set_result(results[object_id])
except Exception as e:
future.set_exception(e)
async def __call__(self, object_id):
if not isinstance(object_id, int):
raise ValueError(object_id)
return await self._enqueue(object_id)
此处,get_metadata
是一个 class 实例,但由于 __call__
特殊方法,您可以像调用普通函数一样调用它。
我有一个 API 端点,它可以接受一个或多个对象 ID 和 return 对它们的响应,例如 http://example.com/api/metadata?id=1&id=2&id=3
。 API 端点的速率受限于每次调用而不是每个 ID,因此最好使用多个 ID 调用 API 端点。
另一方面,我有尝试获取每个 ID 元数据的现有代码,例如:
async def get_metadata(object_id):
response = await session.get(f"http://example.com/api/metadata?id={object_id}")
response.raise_for_status()
return (await response.json())['results'][object_id]
我想保持此函数的签名不变,但对其进行更改,使其不执行单独的请求,而是阻塞直到 a) 50 个 ID 已准备好获取 b) 内部发生 10 秒之类的超时其中一些但不是 50 个 ID 已准备好获取。然后发出一个 API 请求,然后每次(阻塞)调用 get_metadata
returns 相应的结果。所以 get_metadata
的外部行为应该保持不变。
我尝试了一些使用信号量或队列的方法,但我卡住了。那么实现这个的好方法是什么?
信号量在这里不起作用,因为它们的工作方式与您的需要相反:它们不会阻塞,直到一定数量的协程获取它们。您需要 barrier 的 asyncio 等价物,不幸的是,标准库中不存在它。
幸运的是,使用事件和列表来实现屏障并不难。你可以这样做(只是模糊测试):
_waiters = []
_have_new_waiter = None
async def get_metadata(session, object_id):
global _have_new_waiter
if _have_new_waiter is None:
_have_new_waiter = asyncio.Event()
asyncio.create_task(_monitor_incoming(session))
future = asyncio.get_event_loop().create_future()
_waiters.append((object_id, future))
_have_new_waiter.set()
return await future
async def _monitor_incoming(session):
while True:
timeout = False
try:
await asyncio.wait_for(_have_new_waiter.wait(), 10)
except asyncio.TimeoutError:
timeout = True
_have_new_waiter.clear()
if len(_waiters) == 0 or len(_waiters) < 50 and not timeout:
continue
lst = _waiters[:]
del _waiters[:]
asyncio.create_task(_get_batch(session, lst))
async def _get_batch(session, waiter_lst):
object_ids = [object_id for (object_id, _future) in waiter_lst]
try:
async with session.get(
f"http://example.com/api/metadata?ids={'&'.join(map(str, object_ids))}"):
response.raise_for_status()
dct = response.json()['results']
except Exception as e:
for result, (_object_id, future) in zip(results, waiter_lst):
future.set_exception(e)
return
results = [results[object_id] for object_id in object_ids]
for result, (_object_id, future) in zip(results, waiter_lst):
future.set_result(result)
经过一段时间的修修补补,我想到了这个:
import abc, asyncio
class Batcher(metaclass=abc.ABCMeta):
def __init__(self, *, max_batch, timeout):
"""
Constructs a new Batcher. The parameter max_batch specifies
the queue capacity, while timeout is the deadline after which
a queue will be processed whether it’s at capacity or not.
"""
self.__batch = None
self.__event = None
self.__timeout = timeout
self.__maxsize = max_batch
async def __wait(self, event, batch):
try:
await asyncio.wait_for(event.wait(), timeout=self.__timeout)
except asyncio.TimeoutError:
self.__event = None
await self.__run(self.__batch)
async def __run(self, batch):
self.__batch = None
try:
await self._process(batch)
except Exception as e:
for _, future in batch:
if future.done():
continue
future.set_exception(e)
else:
for _, future in batch:
if future.done():
continue
future.set_result(None)
def _setup(self):
"""
Initialises a new batch.
"""
if self.__event is not None:
return
self.__batch = []
self.__event = asyncio.Event()
asyncio.create_task(self.__wait(self.__event, self.__batch))
def _finish(self):
"""
Marks the current batch as complete and starts processing it.
"""
self.__batch = None
self.__event.set()
self.__event = None
def _enqueue(self, item):
"""
Adds an item to be processed in the next batch.
Returns: an awaitable that will return the result of processing
when awaited.
"""
self._setup()
future = asyncio.Future()
self.__batch.append((item, future))
if len(self.__batch) >= self.__maxsize:
self._finish()
return future
@abc.abstractmethod
async def _process(self, batch):
"""
Processes the current batch. The batch parameter contains a list
of pairs (item, future), where item is the value passed to _enqueue,
while future is an asyncio.Future. Call the .set_result and/or
.set_exception methods on the latter to return a result to the
caller; if you don’t assign a result yourself, the returned value
will be None.
"""
raise NotImplementedError
您可以子class Batcher
围绕 _enqueue
创建一个外观,它将验证参数并准备它们以供处理。
示例:
import urllib
def singleton(*args, **kwargs):
def wrapper(cls):
return cls(*args, **kwargs)
return wrapper
@singleton(max_batch=50, timeout=10)
class get_metadata(Batcher):
async def _process(self, batch):
qs = "&".join(
f"id={urllib.parse.quote(str(object_id))}"
for object_id, _ in batch
)
response = await session.get("http://example.com/api/metadata?" + qs)
response.raise_for_status()
results = (await response.json())['results']
for object_id, future in batch:
try:
future.set_result(results[object_id])
except Exception as e:
future.set_exception(e)
async def __call__(self, object_id):
if not isinstance(object_id, int):
raise ValueError(object_id)
return await self._enqueue(object_id)
此处,get_metadata
是一个 class 实例,但由于 __call__
特殊方法,您可以像调用普通函数一样调用它。