aiohttp:限制并行请求的速率
aiohttp: rate limiting parallel requests
APIs 通常有用户必须遵守的速率限制。我们以 50 requests/second 为例。顺序请求需要 0.5-1 秒,因此速度太慢,无法接近该限制。但是,使用 aiohttp 的并行请求超出了速率限制。
要尽可能快地轮询 API,需要限制并行调用的速率。
到目前为止我找到的例子装饰session.get
,大致是这样的:
session.get = rate_limited(max_calls_per_second)(session.get)
这适用于连续调用。尝试在并行调用中实现此功能无法按预期工作。
这里有一些代码作为例子:
async with aiohttp.ClientSession() as session:
session.get = rate_limited(max_calls_per_second)(session.get)
tasks = (asyncio.ensure_future(download_coroutine(
timeout, session, url)) for url in urls)
process_responses_function(await asyncio.gather(*tasks))
问题是它会限制任务的排队速率。 gather
的执行仍将或多或少同时发生。两全其美 ;-).
是的,我在这里发现了一个类似的问题, but neither replies answer the actual question of limiting the rate of requests. Also the blog post from Quentin Pradet仅适用于限速排队。
总结一下:如何限制并行 aiohttp
请求的 每秒请求数 ?
如果我理解你的话,你想限制同时请求的数量?
asyncio
中有一个名为 Semaphore
的对象,它的工作方式类似于异步 RLock
。
semaphore = asyncio.Semaphore(50)
#...
async def limit_wrap(url):
async with semaphore:
# do what you want
#...
results = asyncio.gather([limit_wrap(url) for url in urls])
已更新
假设我发出了 50 个并发请求,它们都在 2 秒内完成。因此,它不会触及限制(每秒仅 25 个请求)。
这意味着我应该发出 100 个并发请求,它们也都在 2 秒内完成(每秒 50 个请求)。但在您实际提出这些请求之前,您如何确定它们将完成多长时间?
或者,如果您不介意 每秒完成的请求数,但 每秒发出的请求数。您可以:
async def loop_wrap(urls):
for url in urls:
asyncio.ensure_future(download(url))
await asyncio.sleep(1/50)
asyncio.ensure_future(loop_wrap(urls))
loop.run_forever()
上面的代码将每 1/50
秒创建一个 Future
实例。
我通过使用基于漏桶算法的速率限制器创建 aiohttp.ClientSession()
的子类来解决这个问题。我使用 asyncio.Queue()
而不是 Semaphores
来限制速率。我只是覆盖了 _request()
方法。我发现这种方法更简洁,因为您只需将 session = aiohttp.ClientSession()
替换为 session = ThrottledClientSession(rate_limit=15)
.
class ThrottledClientSession(aiohttp.ClientSession):
"""Rate-throttled client session class inherited from aiohttp.ClientSession)"""
MIN_SLEEP = 0.1
def __init__(self, rate_limit: float =None, *args,**kwargs) -> None:
super().__init__(*args,**kwargs)
self.rate_limit = rate_limit
self._fillerTask = None
self._queue = None
self._start_time = time.time()
if rate_limit != None:
if rate_limit <= 0:
raise ValueError('rate_limit must be positive')
self._queue = asyncio.Queue(min(2, int(rate_limit)+1))
self._fillerTask = asyncio.create_task(self._filler(rate_limit))
def _get_sleep(self) -> list:
if self.rate_limit != None:
return max(1/self.rate_limit, self.MIN_SLEEP)
return None
async def close(self) -> None:
"""Close rate-limiter's "bucket filler" task"""
if self._fillerTask != None:
self._fillerTask.cancel()
try:
await asyncio.wait_for(self._fillerTask, timeout= 0.5)
except asyncio.TimeoutError as err:
print(str(err))
await super().close()
async def _filler(self, rate_limit: float = 1):
"""Filler task to fill the leaky bucket algo"""
try:
if self._queue == None:
return
self.rate_limit = rate_limit
sleep = self._get_sleep()
updated_at = time.monotonic()
fraction = 0
extra_increment = 0
for i in range(0,self._queue.maxsize):
self._queue.put_nowait(i)
while True:
if not self._queue.full():
now = time.monotonic()
increment = rate_limit * (now - updated_at)
fraction += increment % 1
extra_increment = fraction // 1
items_2_add = int(min(self._queue.maxsize - self._queue.qsize(), int(increment) + extra_increment))
fraction = fraction % 1
for i in range(0,items_2_add):
self._queue.put_nowait(i)
updated_at = now
await asyncio.sleep(sleep)
except asyncio.CancelledError:
print('Cancelled')
except Exception as err:
print(str(err))
async def _allow(self) -> None:
if self._queue != None:
# debug
#if self._start_time == None:
# self._start_time = time.time()
await self._queue.get()
self._queue.task_done()
return None
async def _request(self, *args,**kwargs):
"""Throttled _request()"""
await self._allow()
return await super()._request(*args,**kwargs)
```
我喜欢@sraw 用 asyncio 解决这个问题,但他们的回答对我来说不太合适。因为我不知道我的下载调用是否会比速率限制更快或更慢,所以我希望可以选择 运行 在请求缓慢时并发 运行 一个请求非常快的时候,所以我总是在速率限制上。
我通过使用一个队列来实现这一点,生产者以速率限制产生新任务,然后许多消费者要么全部等待下一个工作,如果他们很快,或者将有工作备份如果他们很慢,队列将 运行 和 processor/network 允许的一样快:
import asyncio
from datetime import datetime
async def download(url):
# download or whatever
task_time = 1/10
await asyncio.sleep(task_time)
result = datetime.now()
return result, url
async def producer_fn(queue, urls, max_per_second):
for url in urls:
await queue.put(url)
await asyncio.sleep(1/max_per_second)
async def consumer(work_queue, result_queue):
while True:
url = await work_queue.get()
result = await download(url)
work_queue.task_done()
await result_queue.put(result)
urls = range(20)
async def main():
work_queue = asyncio.Queue()
result_queue = asyncio.Queue()
num_consumer_tasks = 10
max_per_second = 5
consumers = [asyncio.create_task(consumer(work_queue, result_queue))
for _ in range(num_consumer_tasks)]
producer = asyncio.create_task(producer_fn(work_queue, urls, max_per_second))
await producer
# wait for the remaining tasks to be processed
await work_queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
while not result_queue.empty():
result, url = await result_queue.get()
print(f'{url} finished at {result}')
asyncio.run(main())
至于这里关于调用 gather() 时同时发送 n 个请求的问题,关键是在每次调用之前使用 create_task() 和 await asyncio.sleep(1.1)
。使用 create_task 创建的任何任务立即 运行:
for i in range(THREADS):
await asyncio.sleep(1.1)
tasks.append(
asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
)
await asyncio.gather(*tasks)
限制并发连接数的另一个问题也在下面的示例中通过在 async_payload_wrapper 中使用 ClientSession() 上下文并设置连接器限制来解决。
通过此设置,我可以 运行 25 个协程 (THREADS=25),每个协程循环遍历 URL 队列并且不违反 25 个并发连接规则:
async def send_request(session, url, routine):
start_time = time.time()
print(f"{routine}, sending request: {datetime.now()}")
params = {
'api_key': 'nunya',
'url': '%s' % url,
'render_js': 'false',
'premium_proxy': 'false',
'country_code':'us'
}
try:
async with session.get(url='http://yourAPI.com',params=params,) as response:
data = await response.content.read()
print(f"{routine}, done request: {time.time() - start_time} seconds")
return data
except asyncio.TimeoutError as e:
print('timeout---------------------')
errors.append(url)
except aiohttp.ClientResponseError as e:
print('request failed - Server Error')
errors.append(url)
except Exception as e:
errors.append(url)
async def getData(session, q, test):
while True:
if not q.empty():
url = q.get_nowait()
resp = await send_request(session, url ,test)
if resp is not None:
processData(resp, test, url)
else:
print(f'{test} queue empty')
break
async def async_payload_wrapper():
tasks = []
q = asyncio.Queue()
for url in urls:
await q.put(url)
async with ClientSession(connector=aiohttp.TCPConnector(limit=THREADS), timeout=ClientTimeout(total=61), raise_for_status=True) as session:
for i in range(THREADS):
await asyncio.sleep(1.1)
tasks.append(
asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
)
await asyncio.gather(*tasks)
if __name__ == '__main__':
start_time = time.time()
asyncio.run(async_payload_wrapper())
我开发了一个名为 octopus-api (https://pypi.org/project/octopus-api/) 的库,它使您能够在后台使用 aiohttp 来限制速率并设置对端点的连接(并行)调用数量。它的目标是简化所有需要的 aiohttp 设置。
这里举例说明如何使用,其中get_ethereum是用户自定义的请求函数:
from octopus_api import TentacleSession, OctopusApi
from typing import Dict, List
if __name__ == '__main__':
async def get_ethereum(session: TentacleSession, request: Dict):
async with session.get(url=request["url"], params=request["params"]) as response:
body = await response.json()
return body
client = OctopusApi(rate=50, resolution="sec", connections=6)
result: List = client.execute(requests_list=[{
"url": "https://api.pro.coinbase.com/products/ETH-EUR/candles?granularity=900&start=2021-12-04T00:00:00Z&end=2021-12-04T00:00:00Z",
"params": {}}] * 1000, func=get_ethereum)
print(result)
TentacleSession 的工作方式与您为 aiohttp.ClientSession 编写 POST、GET、PUT 和 PATCH 的方式相同。
让我知道它是否有助于解决与速率限制和并行调用相关的问题。
APIs 通常有用户必须遵守的速率限制。我们以 50 requests/second 为例。顺序请求需要 0.5-1 秒,因此速度太慢,无法接近该限制。但是,使用 aiohttp 的并行请求超出了速率限制。
要尽可能快地轮询 API,需要限制并行调用的速率。
到目前为止我找到的例子装饰session.get
,大致是这样的:
session.get = rate_limited(max_calls_per_second)(session.get)
这适用于连续调用。尝试在并行调用中实现此功能无法按预期工作。
这里有一些代码作为例子:
async with aiohttp.ClientSession() as session:
session.get = rate_limited(max_calls_per_second)(session.get)
tasks = (asyncio.ensure_future(download_coroutine(
timeout, session, url)) for url in urls)
process_responses_function(await asyncio.gather(*tasks))
问题是它会限制任务的排队速率。 gather
的执行仍将或多或少同时发生。两全其美 ;-).
是的,我在这里发现了一个类似的问题
总结一下:如何限制并行 aiohttp
请求的 每秒请求数 ?
如果我理解你的话,你想限制同时请求的数量?
asyncio
中有一个名为 Semaphore
的对象,它的工作方式类似于异步 RLock
。
semaphore = asyncio.Semaphore(50)
#...
async def limit_wrap(url):
async with semaphore:
# do what you want
#...
results = asyncio.gather([limit_wrap(url) for url in urls])
已更新
假设我发出了 50 个并发请求,它们都在 2 秒内完成。因此,它不会触及限制(每秒仅 25 个请求)。
这意味着我应该发出 100 个并发请求,它们也都在 2 秒内完成(每秒 50 个请求)。但在您实际提出这些请求之前,您如何确定它们将完成多长时间?
或者,如果您不介意 每秒完成的请求数,但 每秒发出的请求数。您可以:
async def loop_wrap(urls):
for url in urls:
asyncio.ensure_future(download(url))
await asyncio.sleep(1/50)
asyncio.ensure_future(loop_wrap(urls))
loop.run_forever()
上面的代码将每 1/50
秒创建一个 Future
实例。
我通过使用基于漏桶算法的速率限制器创建 aiohttp.ClientSession()
的子类来解决这个问题。我使用 asyncio.Queue()
而不是 Semaphores
来限制速率。我只是覆盖了 _request()
方法。我发现这种方法更简洁,因为您只需将 session = aiohttp.ClientSession()
替换为 session = ThrottledClientSession(rate_limit=15)
.
class ThrottledClientSession(aiohttp.ClientSession):
"""Rate-throttled client session class inherited from aiohttp.ClientSession)"""
MIN_SLEEP = 0.1
def __init__(self, rate_limit: float =None, *args,**kwargs) -> None:
super().__init__(*args,**kwargs)
self.rate_limit = rate_limit
self._fillerTask = None
self._queue = None
self._start_time = time.time()
if rate_limit != None:
if rate_limit <= 0:
raise ValueError('rate_limit must be positive')
self._queue = asyncio.Queue(min(2, int(rate_limit)+1))
self._fillerTask = asyncio.create_task(self._filler(rate_limit))
def _get_sleep(self) -> list:
if self.rate_limit != None:
return max(1/self.rate_limit, self.MIN_SLEEP)
return None
async def close(self) -> None:
"""Close rate-limiter's "bucket filler" task"""
if self._fillerTask != None:
self._fillerTask.cancel()
try:
await asyncio.wait_for(self._fillerTask, timeout= 0.5)
except asyncio.TimeoutError as err:
print(str(err))
await super().close()
async def _filler(self, rate_limit: float = 1):
"""Filler task to fill the leaky bucket algo"""
try:
if self._queue == None:
return
self.rate_limit = rate_limit
sleep = self._get_sleep()
updated_at = time.monotonic()
fraction = 0
extra_increment = 0
for i in range(0,self._queue.maxsize):
self._queue.put_nowait(i)
while True:
if not self._queue.full():
now = time.monotonic()
increment = rate_limit * (now - updated_at)
fraction += increment % 1
extra_increment = fraction // 1
items_2_add = int(min(self._queue.maxsize - self._queue.qsize(), int(increment) + extra_increment))
fraction = fraction % 1
for i in range(0,items_2_add):
self._queue.put_nowait(i)
updated_at = now
await asyncio.sleep(sleep)
except asyncio.CancelledError:
print('Cancelled')
except Exception as err:
print(str(err))
async def _allow(self) -> None:
if self._queue != None:
# debug
#if self._start_time == None:
# self._start_time = time.time()
await self._queue.get()
self._queue.task_done()
return None
async def _request(self, *args,**kwargs):
"""Throttled _request()"""
await self._allow()
return await super()._request(*args,**kwargs)
```
我喜欢@sraw 用 asyncio 解决这个问题,但他们的回答对我来说不太合适。因为我不知道我的下载调用是否会比速率限制更快或更慢,所以我希望可以选择 运行 在请求缓慢时并发 运行 一个请求非常快的时候,所以我总是在速率限制上。
我通过使用一个队列来实现这一点,生产者以速率限制产生新任务,然后许多消费者要么全部等待下一个工作,如果他们很快,或者将有工作备份如果他们很慢,队列将 运行 和 processor/network 允许的一样快:
import asyncio
from datetime import datetime
async def download(url):
# download or whatever
task_time = 1/10
await asyncio.sleep(task_time)
result = datetime.now()
return result, url
async def producer_fn(queue, urls, max_per_second):
for url in urls:
await queue.put(url)
await asyncio.sleep(1/max_per_second)
async def consumer(work_queue, result_queue):
while True:
url = await work_queue.get()
result = await download(url)
work_queue.task_done()
await result_queue.put(result)
urls = range(20)
async def main():
work_queue = asyncio.Queue()
result_queue = asyncio.Queue()
num_consumer_tasks = 10
max_per_second = 5
consumers = [asyncio.create_task(consumer(work_queue, result_queue))
for _ in range(num_consumer_tasks)]
producer = asyncio.create_task(producer_fn(work_queue, urls, max_per_second))
await producer
# wait for the remaining tasks to be processed
await work_queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
while not result_queue.empty():
result, url = await result_queue.get()
print(f'{url} finished at {result}')
asyncio.run(main())
至于这里关于调用 gather() 时同时发送 n 个请求的问题,关键是在每次调用之前使用 create_task() 和 await asyncio.sleep(1.1)
。使用 create_task 创建的任何任务立即 运行:
for i in range(THREADS):
await asyncio.sleep(1.1)
tasks.append(
asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
)
await asyncio.gather(*tasks)
限制并发连接数的另一个问题也在下面的示例中通过在 async_payload_wrapper 中使用 ClientSession() 上下文并设置连接器限制来解决。
通过此设置,我可以 运行 25 个协程 (THREADS=25),每个协程循环遍历 URL 队列并且不违反 25 个并发连接规则:
async def send_request(session, url, routine):
start_time = time.time()
print(f"{routine}, sending request: {datetime.now()}")
params = {
'api_key': 'nunya',
'url': '%s' % url,
'render_js': 'false',
'premium_proxy': 'false',
'country_code':'us'
}
try:
async with session.get(url='http://yourAPI.com',params=params,) as response:
data = await response.content.read()
print(f"{routine}, done request: {time.time() - start_time} seconds")
return data
except asyncio.TimeoutError as e:
print('timeout---------------------')
errors.append(url)
except aiohttp.ClientResponseError as e:
print('request failed - Server Error')
errors.append(url)
except Exception as e:
errors.append(url)
async def getData(session, q, test):
while True:
if not q.empty():
url = q.get_nowait()
resp = await send_request(session, url ,test)
if resp is not None:
processData(resp, test, url)
else:
print(f'{test} queue empty')
break
async def async_payload_wrapper():
tasks = []
q = asyncio.Queue()
for url in urls:
await q.put(url)
async with ClientSession(connector=aiohttp.TCPConnector(limit=THREADS), timeout=ClientTimeout(total=61), raise_for_status=True) as session:
for i in range(THREADS):
await asyncio.sleep(1.1)
tasks.append(
asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
)
await asyncio.gather(*tasks)
if __name__ == '__main__':
start_time = time.time()
asyncio.run(async_payload_wrapper())
我开发了一个名为 octopus-api (https://pypi.org/project/octopus-api/) 的库,它使您能够在后台使用 aiohttp 来限制速率并设置对端点的连接(并行)调用数量。它的目标是简化所有需要的 aiohttp 设置。
这里举例说明如何使用,其中get_ethereum是用户自定义的请求函数:
from octopus_api import TentacleSession, OctopusApi
from typing import Dict, List
if __name__ == '__main__':
async def get_ethereum(session: TentacleSession, request: Dict):
async with session.get(url=request["url"], params=request["params"]) as response:
body = await response.json()
return body
client = OctopusApi(rate=50, resolution="sec", connections=6)
result: List = client.execute(requests_list=[{
"url": "https://api.pro.coinbase.com/products/ETH-EUR/candles?granularity=900&start=2021-12-04T00:00:00Z&end=2021-12-04T00:00:00Z",
"params": {}}] * 1000, func=get_ethereum)
print(result)
TentacleSession 的工作方式与您为 aiohttp.ClientSession 编写 POST、GET、PUT 和 PATCH 的方式相同。
让我知道它是否有助于解决与速率限制和并行调用相关的问题。