异步和速率限制
Asyncio & rate limiting
我正在编写一个基于 asyncio 框架的应用程序。此应用与具有速率限制(每秒最多 2 次调用)的 API 交互。因此,我将与 API 交互的方法移到了芹菜中,以便将其用作速率限制器。但它看起来像是开销。
有什么方法可以创建一个新的异步事件循环(或其他东西)来保证协程的执行不超过每秒 n?
我相信你能写出这样的循环:
while True:
t0 = loop.time()
await make_io_call()
dt = loop.time() - t0
if dt < 0.5:
await asyncio.sleep(0.5 - dt, loop=loop)
接受的答案是准确的。但是请注意,通常人们希望尽可能接近 2QPS。此方法不提供任何并行化,如果 make_io_call() 执行时间超过一秒,这可能会成为问题。更好的解决方案是将信号量传递给 make_io_call,它可以用来知道它是否可以开始执行。
下面是这样一个实现:RateLimitingSemaphore
只会在速率限制低于要求时释放其上下文。
import asyncio
from collections import deque
from datetime import datetime
class RateLimitingSemaphore:
def __init__(self, qps_limit, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.qps_limit = qps_limit
# The number of calls that are queued up, waiting for their turn.
self.queued_calls = 0
# The times of the last N executions, where N=qps_limit - this should allow us to calculate the QPS within the
# last ~ second. Note that this also allows us to schedule the first N executions immediately.
self.call_times = deque()
async def __aenter__(self):
self.queued_calls += 1
while True:
cur_rate = 0
if len(self.call_times) == self.qps_limit:
cur_rate = len(self.call_times) / (self.loop.time() - self.call_times[0])
if cur_rate < self.qps_limit:
break
interval = 1. / self.qps_limit
elapsed_time = self.loop.time() - self.call_times[-1]
await asyncio.sleep(self.queued_calls * interval - elapsed_time)
self.queued_calls -= 1
if len(self.call_times) == self.qps_limit:
self.call_times.popleft()
self.call_times.append(self.loop.time())
async def __aexit__(self, exc_type, exc, tb):
pass
async def test(qps):
executions = 0
async def io_operation(semaphore):
async with semaphore:
nonlocal executions
executions += 1
semaphore = RateLimitingSemaphore(qps)
start = datetime.now()
await asyncio.wait([io_operation(semaphore) for i in range(5*qps)])
dt = (datetime.now() - start).total_seconds()
print('Desired QPS:', qps, 'Achieved QPS:', executions / dt)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(test(100))
asyncio.get_event_loop().close()
将打印 Desired QPS: 100 Achieved QPS: 99.82723898022084
我正在编写一个基于 asyncio 框架的应用程序。此应用与具有速率限制(每秒最多 2 次调用)的 API 交互。因此,我将与 API 交互的方法移到了芹菜中,以便将其用作速率限制器。但它看起来像是开销。
有什么方法可以创建一个新的异步事件循环(或其他东西)来保证协程的执行不超过每秒 n?
我相信你能写出这样的循环:
while True:
t0 = loop.time()
await make_io_call()
dt = loop.time() - t0
if dt < 0.5:
await asyncio.sleep(0.5 - dt, loop=loop)
接受的答案是准确的。但是请注意,通常人们希望尽可能接近 2QPS。此方法不提供任何并行化,如果 make_io_call() 执行时间超过一秒,这可能会成为问题。更好的解决方案是将信号量传递给 make_io_call,它可以用来知道它是否可以开始执行。
下面是这样一个实现:RateLimitingSemaphore
只会在速率限制低于要求时释放其上下文。
import asyncio
from collections import deque
from datetime import datetime
class RateLimitingSemaphore:
def __init__(self, qps_limit, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.qps_limit = qps_limit
# The number of calls that are queued up, waiting for their turn.
self.queued_calls = 0
# The times of the last N executions, where N=qps_limit - this should allow us to calculate the QPS within the
# last ~ second. Note that this also allows us to schedule the first N executions immediately.
self.call_times = deque()
async def __aenter__(self):
self.queued_calls += 1
while True:
cur_rate = 0
if len(self.call_times) == self.qps_limit:
cur_rate = len(self.call_times) / (self.loop.time() - self.call_times[0])
if cur_rate < self.qps_limit:
break
interval = 1. / self.qps_limit
elapsed_time = self.loop.time() - self.call_times[-1]
await asyncio.sleep(self.queued_calls * interval - elapsed_time)
self.queued_calls -= 1
if len(self.call_times) == self.qps_limit:
self.call_times.popleft()
self.call_times.append(self.loop.time())
async def __aexit__(self, exc_type, exc, tb):
pass
async def test(qps):
executions = 0
async def io_operation(semaphore):
async with semaphore:
nonlocal executions
executions += 1
semaphore = RateLimitingSemaphore(qps)
start = datetime.now()
await asyncio.wait([io_operation(semaphore) for i in range(5*qps)])
dt = (datetime.now() - start).total_seconds()
print('Desired QPS:', qps, 'Achieved QPS:', executions / dt)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(test(100))
asyncio.get_event_loop().close()
将打印 Desired QPS: 100 Achieved QPS: 99.82723898022084