aiohttp 速率限制请求与不可靠的互联网
aiohttp rate limiting requests with unreliable internet
我正在从具有非常 严格速率限制的网站下载内容。如果我超过 10req/sec,我将被禁言 10 分钟。我一直在使用以下代码来限制 AIOHTTP 的速率:
import time
class RateLimitedClientSession:
"""Rate Limited Client.
Attributes:
client (aiohttp.ClientSession): A client to call
rate_limit (int): Maximum number of requests per second to make
https://quentin.pradet.me/blog/how-do-you-rate-limit-calls-with-aiohttp.html
"""
def __init__(self, client, rate_limit):
self.client = client
self.rate_limit = rate_limit
self.max_tokens = rate_limit
self.tokens = self.max_tokens
self.updated_at = time.monotonic()
self.start = time.monotonic()
async def get(self, *args, **kwargs):
"""Wrapper for ``client.get`` that first waits for a token."""
await self.wait_for_token()
return self.client.get(*args, **kwargs)
async def wait_for_token(self):
"""Sleeps until a new token is added."""
while self.tokens < 1:
self.add_new_tokens()
await asyncio.sleep(0.03) # Arbitrary delay, must be small though.
self.tokens -= 1
def add_new_tokens(self):
"""Adds a new token if time elapsed is greater than minimum time."""
now = time.monotonic()
time_since_update = now - self.updated_at
new_tokens = time_since_update * self.rate_limit
if self.tokens + new_tokens >= 1:
self.tokens = min(self.tokens + new_tokens, self.max_tokens)
self.updated_at = now
那我就可以这样用了:
from aiohttp import ClientSession, TCPConnector
limit = 9 # 9 requests per second
inputs = ['url1', 'url2', 'url3', ...]
conn = TCPConnector(limit=limit)
raw_client = ClientSession(connector=conn, headers={'Connection': 'keep-alive'})
async with raw_client:
session = RateLimitedClientSession(raw_client, limit)
tasks = [asyncio.ensure_future(download_link(link, session)) for link in inputs]
for task in asyncio.as_completed(tasks):
await task
async def download_link(link, session):
async with await session.get(link) as resp:
data = await resp.read()
# Then write data to a file
我的问题是代码会在随机的时间内正常运行,通常在 100 到 2000 之间。然后,由于达到速率限制而退出。我怀疑这与我的互联网延迟有关。
例如,想象一个 3 request/second 的限制。
SECOND 1:
+ REQ 1
+ REQ 2
+ REQ 3
SECOND 2:
+ REQ 4
+ REQ 5
+ REQ 6
稍有延迟,这可能看起来像
SECOND 1:
+ REQ 1
+ REQ 2
SECOND 2:
+ REQ 3 - rolled over from previous second due to internet speed
+ REQ 4
+ REQ 5
+ REQ 6
然后触发速率限制。
我该怎么做才能将发生这种情况的可能性降到最低?
我已经尝试降低速率限制,它确实可以工作更长的时间,但最终还是会达到速率限制。
我也尝试过以 1/10 秒的间隔触发每个请求,但这仍然会触发速率限制(可能是出于不相关的原因?)。
我认为最好的解决方案是将请求分组,等待缺失的时间。我不再在 AIOHTTP 周围使用限速包装器。
async def download_link(link, session):
async with await session.get(link) as resp:
data = await resp.read()
# Then write data to a file
def batch(iterable, n):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
rate_limit = 10
conn = aiohttp.TCPConnector(limit=rate_limit)
client = aiohttp.ClientSession(
connector=conn, headers={'Connection': 'keep-alive'}, raise_for_status=True)
async with client:
for group in batch(inputs, rate_limit):
start = time.monotonic()
tasks = [download_link(link, client) for link in group]
await asyncio.gather(*tasks) # If results are needed they can be assigned here
execution_time = time.monotonic() - start
# If execution time > 1, requests are essentially wasted, but a small price to pay
await asyncio.sleep(max(0, 1 - execution_time))
我正在从具有非常 严格速率限制的网站下载内容。如果我超过 10req/sec,我将被禁言 10 分钟。我一直在使用以下代码来限制 AIOHTTP 的速率:
import time
class RateLimitedClientSession:
"""Rate Limited Client.
Attributes:
client (aiohttp.ClientSession): A client to call
rate_limit (int): Maximum number of requests per second to make
https://quentin.pradet.me/blog/how-do-you-rate-limit-calls-with-aiohttp.html
"""
def __init__(self, client, rate_limit):
self.client = client
self.rate_limit = rate_limit
self.max_tokens = rate_limit
self.tokens = self.max_tokens
self.updated_at = time.monotonic()
self.start = time.monotonic()
async def get(self, *args, **kwargs):
"""Wrapper for ``client.get`` that first waits for a token."""
await self.wait_for_token()
return self.client.get(*args, **kwargs)
async def wait_for_token(self):
"""Sleeps until a new token is added."""
while self.tokens < 1:
self.add_new_tokens()
await asyncio.sleep(0.03) # Arbitrary delay, must be small though.
self.tokens -= 1
def add_new_tokens(self):
"""Adds a new token if time elapsed is greater than minimum time."""
now = time.monotonic()
time_since_update = now - self.updated_at
new_tokens = time_since_update * self.rate_limit
if self.tokens + new_tokens >= 1:
self.tokens = min(self.tokens + new_tokens, self.max_tokens)
self.updated_at = now
那我就可以这样用了:
from aiohttp import ClientSession, TCPConnector
limit = 9 # 9 requests per second
inputs = ['url1', 'url2', 'url3', ...]
conn = TCPConnector(limit=limit)
raw_client = ClientSession(connector=conn, headers={'Connection': 'keep-alive'})
async with raw_client:
session = RateLimitedClientSession(raw_client, limit)
tasks = [asyncio.ensure_future(download_link(link, session)) for link in inputs]
for task in asyncio.as_completed(tasks):
await task
async def download_link(link, session):
async with await session.get(link) as resp:
data = await resp.read()
# Then write data to a file
我的问题是代码会在随机的时间内正常运行,通常在 100 到 2000 之间。然后,由于达到速率限制而退出。我怀疑这与我的互联网延迟有关。
例如,想象一个 3 request/second 的限制。
SECOND 1:
+ REQ 1
+ REQ 2
+ REQ 3
SECOND 2:
+ REQ 4
+ REQ 5
+ REQ 6
稍有延迟,这可能看起来像
SECOND 1:
+ REQ 1
+ REQ 2
SECOND 2:
+ REQ 3 - rolled over from previous second due to internet speed
+ REQ 4
+ REQ 5
+ REQ 6
然后触发速率限制。
我该怎么做才能将发生这种情况的可能性降到最低?
我已经尝试降低速率限制,它确实可以工作更长的时间,但最终还是会达到速率限制。
我也尝试过以 1/10 秒的间隔触发每个请求,但这仍然会触发速率限制(可能是出于不相关的原因?)。
我认为最好的解决方案是将请求分组,等待缺失的时间。我不再在 AIOHTTP 周围使用限速包装器。
async def download_link(link, session):
async with await session.get(link) as resp:
data = await resp.read()
# Then write data to a file
def batch(iterable, n):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
rate_limit = 10
conn = aiohttp.TCPConnector(limit=rate_limit)
client = aiohttp.ClientSession(
connector=conn, headers={'Connection': 'keep-alive'}, raise_for_status=True)
async with client:
for group in batch(inputs, rate_limit):
start = time.monotonic()
tasks = [download_link(link, client) for link in group]
await asyncio.gather(*tasks) # If results are needed they can be assigned here
execution_time = time.monotonic() - start
# If execution time > 1, requests are essentially wasted, but a small price to pay
await asyncio.sleep(max(0, 1 - execution_time))