aiohttp.TCPConnector(带限制参数)vs asyncio.Semaphore 用于限制并发连接数
aiohttp.TCPConnector (with limit argument) vs asyncio.Semaphore for limiting the number of concurrent connections
我想我想学习新的 python async await 语法,更具体地说是 asyncio 模块,方法是制作一个允许您一次下载多个资源的简单脚本。
但现在我卡住了。
在研究过程中,我遇到了两个限制并发请求数的选项:
- 将 aiohttp.TCPConnector(带有 limit 参数)传递给 aiohttp.ClientSession 或
- 使用 asyncio.Semaphore.
如果您只想限制并发连接数,是否有首选选项或者它们可以互换使用?
两者在性能方面(大致)相等吗?
而且两者似乎都有 100 个并发的默认值 connections/operations。如果我只使用限制为 500 的信号量,aiohttp 内部会隐式地将我锁定为 100 个并发连接吗?
这对我来说是全新的和不清楚的。请随时指出我的任何误解或我的代码中的缺陷。
这是我的代码,目前包含两个选项(我应该删除哪个?):
奖金问题:
- 如何处理(最好重试 x 次)抛出错误的 coros?
- coro 完成后立即保存返回数据(通知我的 DataHandler)的最佳方法是什么?我不希望最后都保存下来,因为我可以尽快开始处理结果。
s
import asyncio
from tqdm import tqdm
import uvloop as uvloop
from aiohttp import ClientSession, TCPConnector, BasicAuth
# You can ignore this class
class DummyDataHandler(DataHandler):
"""Takes data and stores it somewhere"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def take(self, origin_url, data):
return True
def done(self):
return None
class AsyncDownloader(object):
def __init__(self, concurrent_connections=100, silent=False, data_handler=None, loop_policy=None):
self.concurrent_connections = concurrent_connections
self.silent = silent
self.data_handler = data_handler or DummyDataHandler()
self.sending_bar = None
self.receiving_bar = None
asyncio.set_event_loop_policy(loop_policy or uvloop.EventLoopPolicy())
self.loop = asyncio.get_event_loop()
self.semaphore = asyncio.Semaphore(concurrent_connections)
async def fetch(self, session, url):
# This is option 1: The semaphore, limiting the number of concurrent coros,
# thereby limiting the number of concurrent requests.
with (await self.semaphore):
async with session.get(url) as response:
# Bonus Question 1: What is the best way to retry a request that failed?
resp_task = asyncio.ensure_future(response.read())
self.sending_bar.update(1)
resp = await resp_task
await response.release()
if not self.silent:
self.receiving_bar.update(1)
return resp
async def batch_download(self, urls, auth=None):
# This is option 2: Limiting the number of open connections directly via the TCPConnector
conn = TCPConnector(limit=self.concurrent_connections, keepalive_timeout=60)
async with ClientSession(connector=conn, auth=auth) as session:
await asyncio.gather(*[asyncio.ensure_future(self.download_and_save(session, url)) for url in urls])
async def download_and_save(self, session, url):
content_task = asyncio.ensure_future(self.fetch(session, url))
content = await content_task
# Bonus Question 2: This is blocking, I know. Should this be wrapped in another coro
# or should I use something like asyncio.as_completed in the download function?
self.data_handler.take(origin_url=url, data=content)
def download(self, urls, auth=None):
if isinstance(auth, tuple):
auth = BasicAuth(*auth)
print('Running on concurrency level {}'.format(self.concurrent_connections))
self.sending_bar = tqdm(urls, total=len(urls), desc='Sent ', unit='requests')
self.sending_bar.update(0)
self.receiving_bar = tqdm(urls, total=len(urls), desc='Reveived', unit='requests')
self.receiving_bar.update(0)
tasks = self.batch_download(urls, auth)
self.loop.run_until_complete(tasks)
return self.data_handler.done()
### call like so ###
URL_PATTERN = 'https://www.example.com/{}.html'
def gen_url(lower=0, upper=None):
for i in range(lower, upper):
yield URL_PATTERN.format(i)
ad = AsyncDownloader(concurrent_connections=30)
data = ad.download([g for g in gen_url(upper=1000)])
有首选方案吗?
是的,见下文:
aiohttp 内部会隐式地将我锁定为 100 个并发连接吗?
是的,默认值 100 会锁定您,除非您指定另一个限制。
您可以在此处的源代码中看到它:https://github.com/aio-libs/aiohttp/blob/master/aiohttp/connector.py#L1084
它们在性能方面(大致)相等吗?
否(但性能差异应该可以忽略不计),因为 aiohttp.TCPConnector
无论如何都会检查可用连接,无论它是否被信号量包围,在这里使用信号量只是不必要的开销。
如何处理(最好重试 x 次)抛出错误的 coros?
我不认为有这样做的标准方法,但一种解决方案是将您的调用包装在这样的方法中:
async def retry_requests(...):
for i in range(5):
try:
return (await session.get(...)
except aiohttp.ClientResponseError:
pass
如何处理(最好重试 x 次)抛出错误的 coros?
我创建了一个 Python 装饰器来处理这个问题
def retry(cls, exceptions, tries=3, delay=2, backoff=2):
"""
Retry calling the decorated function using an exponential backoff. This
is required in case of requesting Braze API produces any exceptions.
Args:
exceptions: The exception to check. may be a tuple of
exceptions to check.
tries: Number of times to try (not retry) before giving up.
delay: Initial delay between retries in seconds.
backoff: Backoff multiplier (e.g. value of 2 will double the delay
each retry).
"""
def deco_retry(func):
@wraps(func)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
while mtries > 1:
try:
return func(*args, **kwargs)
except exceptions as e:
msg = '{}, Retrying in {} seconds...'.format(e, mdelay)
if logging:
logging.warning(msg)
else:
print(msg)
time.sleep(mdelay)
mtries -= 1
mdelay *= backoff
return func(*args, **kwargs)
return f_retry
return deco_retry
我想我想学习新的 python async await 语法,更具体地说是 asyncio 模块,方法是制作一个允许您一次下载多个资源的简单脚本。
但现在我卡住了。
在研究过程中,我遇到了两个限制并发请求数的选项:
- 将 aiohttp.TCPConnector(带有 limit 参数)传递给 aiohttp.ClientSession 或
- 使用 asyncio.Semaphore.
如果您只想限制并发连接数,是否有首选选项或者它们可以互换使用? 两者在性能方面(大致)相等吗?
而且两者似乎都有 100 个并发的默认值 connections/operations。如果我只使用限制为 500 的信号量,aiohttp 内部会隐式地将我锁定为 100 个并发连接吗?
这对我来说是全新的和不清楚的。请随时指出我的任何误解或我的代码中的缺陷。
这是我的代码,目前包含两个选项(我应该删除哪个?):
奖金问题:
- 如何处理(最好重试 x 次)抛出错误的 coros?
- coro 完成后立即保存返回数据(通知我的 DataHandler)的最佳方法是什么?我不希望最后都保存下来,因为我可以尽快开始处理结果。
s
import asyncio
from tqdm import tqdm
import uvloop as uvloop
from aiohttp import ClientSession, TCPConnector, BasicAuth
# You can ignore this class
class DummyDataHandler(DataHandler):
"""Takes data and stores it somewhere"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def take(self, origin_url, data):
return True
def done(self):
return None
class AsyncDownloader(object):
def __init__(self, concurrent_connections=100, silent=False, data_handler=None, loop_policy=None):
self.concurrent_connections = concurrent_connections
self.silent = silent
self.data_handler = data_handler or DummyDataHandler()
self.sending_bar = None
self.receiving_bar = None
asyncio.set_event_loop_policy(loop_policy or uvloop.EventLoopPolicy())
self.loop = asyncio.get_event_loop()
self.semaphore = asyncio.Semaphore(concurrent_connections)
async def fetch(self, session, url):
# This is option 1: The semaphore, limiting the number of concurrent coros,
# thereby limiting the number of concurrent requests.
with (await self.semaphore):
async with session.get(url) as response:
# Bonus Question 1: What is the best way to retry a request that failed?
resp_task = asyncio.ensure_future(response.read())
self.sending_bar.update(1)
resp = await resp_task
await response.release()
if not self.silent:
self.receiving_bar.update(1)
return resp
async def batch_download(self, urls, auth=None):
# This is option 2: Limiting the number of open connections directly via the TCPConnector
conn = TCPConnector(limit=self.concurrent_connections, keepalive_timeout=60)
async with ClientSession(connector=conn, auth=auth) as session:
await asyncio.gather(*[asyncio.ensure_future(self.download_and_save(session, url)) for url in urls])
async def download_and_save(self, session, url):
content_task = asyncio.ensure_future(self.fetch(session, url))
content = await content_task
# Bonus Question 2: This is blocking, I know. Should this be wrapped in another coro
# or should I use something like asyncio.as_completed in the download function?
self.data_handler.take(origin_url=url, data=content)
def download(self, urls, auth=None):
if isinstance(auth, tuple):
auth = BasicAuth(*auth)
print('Running on concurrency level {}'.format(self.concurrent_connections))
self.sending_bar = tqdm(urls, total=len(urls), desc='Sent ', unit='requests')
self.sending_bar.update(0)
self.receiving_bar = tqdm(urls, total=len(urls), desc='Reveived', unit='requests')
self.receiving_bar.update(0)
tasks = self.batch_download(urls, auth)
self.loop.run_until_complete(tasks)
return self.data_handler.done()
### call like so ###
URL_PATTERN = 'https://www.example.com/{}.html'
def gen_url(lower=0, upper=None):
for i in range(lower, upper):
yield URL_PATTERN.format(i)
ad = AsyncDownloader(concurrent_connections=30)
data = ad.download([g for g in gen_url(upper=1000)])
有首选方案吗?
是的,见下文:
aiohttp 内部会隐式地将我锁定为 100 个并发连接吗?
是的,默认值 100 会锁定您,除非您指定另一个限制。 您可以在此处的源代码中看到它:https://github.com/aio-libs/aiohttp/blob/master/aiohttp/connector.py#L1084
它们在性能方面(大致)相等吗?
否(但性能差异应该可以忽略不计),因为 aiohttp.TCPConnector
无论如何都会检查可用连接,无论它是否被信号量包围,在这里使用信号量只是不必要的开销。
如何处理(最好重试 x 次)抛出错误的 coros?
我不认为有这样做的标准方法,但一种解决方案是将您的调用包装在这样的方法中:
async def retry_requests(...):
for i in range(5):
try:
return (await session.get(...)
except aiohttp.ClientResponseError:
pass
如何处理(最好重试 x 次)抛出错误的 coros?
我创建了一个 Python 装饰器来处理这个问题
def retry(cls, exceptions, tries=3, delay=2, backoff=2):
"""
Retry calling the decorated function using an exponential backoff. This
is required in case of requesting Braze API produces any exceptions.
Args:
exceptions: The exception to check. may be a tuple of
exceptions to check.
tries: Number of times to try (not retry) before giving up.
delay: Initial delay between retries in seconds.
backoff: Backoff multiplier (e.g. value of 2 will double the delay
each retry).
"""
def deco_retry(func):
@wraps(func)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
while mtries > 1:
try:
return func(*args, **kwargs)
except exceptions as e:
msg = '{}, Retrying in {} seconds...'.format(e, mdelay)
if logging:
logging.warning(msg)
else:
print(msg)
time.sleep(mdelay)
mtries -= 1
mdelay *= backoff
return func(*args, **kwargs)
return f_retry
return deco_retry