如何重用aiohttp ClientSession池?
How to reuse aiohttp ClientSession pool?
文档说要重用 ClientSession:
Don’t create a session per request. Most likely you need a session per
application which performs all requests altogether.
A session contains a connection pool inside, connection reusage and
keep-alives (both are on by default) may speed up total performance.1
但是文档中似乎没有任何关于如何做到这一点的解释?有一个可能相关的示例,但它没有说明如何在别处重用池:http://aiohttp.readthedocs.io/en/stable/client.html#keep-alive-connection-pooling-and-cookie-sharing
这样的事情是正确的做法吗?
@app.listener('before_server_start')
async def before_server_start(app, loop):
app.pg_pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
app.http_session_pool = aiohttp.ClientSession()
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
app.http_session_pool.close()
app.pg_pool.close()
@app.post("/api/register")
async def register(request):
# json validation
async with app.pg_pool.acquire() as pg:
await pg.execute() # create unactivated user in db
async with app.http_session_pool as session:
# TODO send activation email using SES API
async with session.post('http://httpbin.org/post', data=b'data') as resp:
print(resp.status)
print(await resp.text())
return HTTPResponse(status=204)
我认为可以改进的地方很少:
1)
ClientSession
的实例是一个会话对象。此会话包含连接池,但它不是 "session_pool" 本身。我建议将 http_session_pool
重命名为 http_session
或者 client_session
.
2)
会话的 close()
方法 is a corountine。你应该等待它:
await app.client_session.close()
甚至更好(恕我直言),而不是考虑如何正确 open/close 会话使用标准异步上下文管理器等待 __aenter__
/ __aexit__
:
@app.listener('before_server_start')
async def before_server_start(app, loop):
# ...
app.client_session = await aiohttp.ClientSession().__aenter__()
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
await app.client_session.__aexit__(None, None, None)
# ...
3)
关注this info:
However, if the event loop is stopped before the underlying connection
is closed, an ResourceWarning: unclosed transport
warning is emitted
(when warnings are enabled).
To avoid this situation, a small delay must be added before closing
the event loop to allow any open underlying connections to close.
我不确定在你的情况下它是强制性的,但在 after_server_stop
中添加 await asyncio.sleep(0)
作为文档建议没有什么不好:
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
# ...
await asyncio.sleep(0) # http://aiohttp.readthedocs.io/en/stable/client.html#graceful-shutdown
更新:
实现 __aenter__
/ __aexit__
的 Class 可以用作 async context manager (可以在 async with
语句中使用)。它允许在执行内部块之前和之后执行一些操作。这与常规上下文管理器非常相似,但 asyncio
相关。与常规上下文管理器异步一样,可以直接使用(无需 async with
)手动等待 __aenter__
/ __aexit__
.
为什么我认为使用 __aenter__
/ __aexit__
手动 create/free 会话比使用 close()
更好?因为我们不应该担心 __aenter__
/ __aexit__
中实际发生的事情。想象一下,在 aiohttp
的未来版本中,会话的创建将随着等待 open()
的需要而改变。如果您将使用 __aenter__
/ __aexit__
,则无需以某种方式更改您的代码。
我在 Google 上搜索有关如何在我的代码触发此警告消息后重用 aiohttp ClientSession 实例后发现了这个问题:UserWarning:在协程之外创建客户端会话是一个非常危险的想法
这段代码虽然有关联,但可能无法解决上述问题。我是 asyncio 和 aiohttp 的新手,所以这可能不是最佳实践。这是我在阅读了很多看似矛盾的信息后能想到的最好的。
我创建了一个 class ResourceManager,取自 Python 打开上下文的文档。
ResourceManager 实例通过魔术方法 __aenter__
和 __aexit__
以及 BaseScraper.set_session 和 BaseScraper.close_session 包装器方法处理 aiohttp ClientSession 实例的打开和关闭。
我能够通过以下代码重用 ClientSession 实例。
BaseScraper class 也有验证方法。依赖lxml第三方包
import asyncio
from time import time
from contextlib import contextmanager, AbstractContextManager, ExitStack
import aiohttp
import lxml.html
class ResourceManager(AbstractContextManager):
# Code taken from Python docs: 29.6.2.4. of https://docs.python.org/3.6/library/contextlib.html
def __init__(self, scraper, check_resource_ok=None):
self.acquire_resource = scraper.acquire_resource
self.release_resource = scraper.release_resource
if check_resource_ok is None:
def check_resource_ok(resource):
return True
self.check_resource_ok = check_resource_ok
@contextmanager
def _cleanup_on_error(self):
with ExitStack() as stack:
stack.push(self)
yield
# The validation check passed and didn't raise an exception
# Accordingly, we want to keep the resource, and pass it
# back to our caller
stack.pop_all()
def __enter__(self):
resource = self.acquire_resource()
with self._cleanup_on_error():
if not self.check_resource_ok(resource):
msg = "Failed validation for {!r}"
raise RuntimeError(msg.format(resource))
return resource
def __exit__(self, *exc_details):
# We don't need to duplicate any of our resource release logic
self.release_resource()
class BaseScraper:
login_url = ""
login_data = dict() # dict of key, value pairs to fill the login form
loop = asyncio.get_event_loop()
def __init__(self, urls):
self.urls = urls
self.acquire_resource = self.set_session
self.release_resource = self.close_session
async def _set_session(self):
self.session = await aiohttp.ClientSession().__aenter__()
def set_session(self):
set_session_attr = self.loop.create_task(self._set_session())
self.loop.run_until_complete(set_session_attr)
return self # variable after "as" becomes instance of BaseScraper
async def _close_session(self):
await self.session.__aexit__(None, None, None)
def close_session(self):
close_session = self.loop.create_task(self._close_session())
self.loop.run_until_complete(close_session)
def __call__(self):
fetch_urls = self.loop.create_task(self._fetch())
return self.loop.run_until_complete(fetch_urls)
async def _get(self, url):
async with self.session.get(url) as response:
result = await response.read()
return url, result
async def _fetch(self):
tasks = (self.loop.create_task(self._get(url)) for url in self.urls)
start = time()
results = await asyncio.gather(*tasks)
print(
"time elapsed: {} seconds \nurls count: {}".format(
time() - start, len(urls)
)
)
return results
@property
def form(self):
"""Create and return form for authentication."""
form = aiohttp.FormData(self.login_data)
get_login_page = self.loop.create_task(self._get(self.login_url))
url, login_page = self.loop.run_until_complete(get_login_page)
login_html = lxml.html.fromstring(login_page)
hidden_inputs = login_html.xpath(r'//form//input[@type="hidden"]')
login_form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
for key, value in login_form.items():
form.add_field(key, value)
return form
async def _login(self, form):
async with self.session.post(self.login_url, data=form) as response:
if response.status != 200:
response.raise_for_status()
print("logged into {}".format(url))
await response.release()
def login(self):
post_login_form = self.loop.create_task(self._login(self.form))
self.loop.run_until_complete(post_login_form)
if __name__ == "__main__":
urls = ("http://example.com",) * 10
base_scraper = BaseScraper(urls)
with ResourceManager(base_scraper) as scraper:
for url, html in scraper():
print(url, len(html))
aiohttp 中似乎没有会话池。
// 只是 post 一些官方文档。
持久会话
这里是persistent-session
官方网站的使用演示
https://docs.aiohttp.org/en/latest/client_advanced.html#persistent-session
app.cleanup_ctx.append(persistent_session)
async def persistent_session(app):
app['PERSISTENT_SESSION'] = session = aiohttp.ClientSession()
yield
await session.close()
async def my_request_handler(request):
session = request.app['PERSISTENT_SESSION']
async with session.get("http://python.org") as resp:
print(resp.status)
//TODO:完整的可运行演示代码
连接池
它有一个连接池:
https://docs.aiohttp.org/en/latest/client_advanced.html#connectors
conn = aiohttp.TCPConnector()
#conn = aiohttp.TCPConnector(limit=30)
#conn = aiohttp.TCPConnector(limit=0) # nolimit, default is 100.
#conn = aiohttp.TCPConnector(limit_per_host=30) # default is 0
session = aiohttp.ClientSession(connector=conn)
文档说要重用 ClientSession:
Don’t create a session per request. Most likely you need a session per application which performs all requests altogether.
A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.1
但是文档中似乎没有任何关于如何做到这一点的解释?有一个可能相关的示例,但它没有说明如何在别处重用池:http://aiohttp.readthedocs.io/en/stable/client.html#keep-alive-connection-pooling-and-cookie-sharing
这样的事情是正确的做法吗?
@app.listener('before_server_start')
async def before_server_start(app, loop):
app.pg_pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
app.http_session_pool = aiohttp.ClientSession()
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
app.http_session_pool.close()
app.pg_pool.close()
@app.post("/api/register")
async def register(request):
# json validation
async with app.pg_pool.acquire() as pg:
await pg.execute() # create unactivated user in db
async with app.http_session_pool as session:
# TODO send activation email using SES API
async with session.post('http://httpbin.org/post', data=b'data') as resp:
print(resp.status)
print(await resp.text())
return HTTPResponse(status=204)
我认为可以改进的地方很少:
1)
ClientSession
的实例是一个会话对象。此会话包含连接池,但它不是 "session_pool" 本身。我建议将 http_session_pool
重命名为 http_session
或者 client_session
.
2)
会话的 close()
方法 is a corountine。你应该等待它:
await app.client_session.close()
甚至更好(恕我直言),而不是考虑如何正确 open/close 会话使用标准异步上下文管理器等待 __aenter__
/ __aexit__
:
@app.listener('before_server_start')
async def before_server_start(app, loop):
# ...
app.client_session = await aiohttp.ClientSession().__aenter__()
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
await app.client_session.__aexit__(None, None, None)
# ...
3)
关注this info:
However, if the event loop is stopped before the underlying connection is closed, an
ResourceWarning: unclosed transport
warning is emitted (when warnings are enabled).To avoid this situation, a small delay must be added before closing the event loop to allow any open underlying connections to close.
我不确定在你的情况下它是强制性的,但在 after_server_stop
中添加 await asyncio.sleep(0)
作为文档建议没有什么不好:
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
# ...
await asyncio.sleep(0) # http://aiohttp.readthedocs.io/en/stable/client.html#graceful-shutdown
更新:
实现__aenter__
/ __aexit__
的 Class 可以用作 async context manager (可以在 async with
语句中使用)。它允许在执行内部块之前和之后执行一些操作。这与常规上下文管理器非常相似,但 asyncio
相关。与常规上下文管理器异步一样,可以直接使用(无需 async with
)手动等待 __aenter__
/ __aexit__
.
为什么我认为使用 __aenter__
/ __aexit__
手动 create/free 会话比使用 close()
更好?因为我们不应该担心 __aenter__
/ __aexit__
中实际发生的事情。想象一下,在 aiohttp
的未来版本中,会话的创建将随着等待 open()
的需要而改变。如果您将使用 __aenter__
/ __aexit__
,则无需以某种方式更改您的代码。
我在 Google 上搜索有关如何在我的代码触发此警告消息后重用 aiohttp ClientSession 实例后发现了这个问题:UserWarning:在协程之外创建客户端会话是一个非常危险的想法
这段代码虽然有关联,但可能无法解决上述问题。我是 asyncio 和 aiohttp 的新手,所以这可能不是最佳实践。这是我在阅读了很多看似矛盾的信息后能想到的最好的。
我创建了一个 class ResourceManager,取自 Python 打开上下文的文档。
ResourceManager 实例通过魔术方法 __aenter__
和 __aexit__
以及 BaseScraper.set_session 和 BaseScraper.close_session 包装器方法处理 aiohttp ClientSession 实例的打开和关闭。
我能够通过以下代码重用 ClientSession 实例。
BaseScraper class 也有验证方法。依赖lxml第三方包
import asyncio
from time import time
from contextlib import contextmanager, AbstractContextManager, ExitStack
import aiohttp
import lxml.html
class ResourceManager(AbstractContextManager):
# Code taken from Python docs: 29.6.2.4. of https://docs.python.org/3.6/library/contextlib.html
def __init__(self, scraper, check_resource_ok=None):
self.acquire_resource = scraper.acquire_resource
self.release_resource = scraper.release_resource
if check_resource_ok is None:
def check_resource_ok(resource):
return True
self.check_resource_ok = check_resource_ok
@contextmanager
def _cleanup_on_error(self):
with ExitStack() as stack:
stack.push(self)
yield
# The validation check passed and didn't raise an exception
# Accordingly, we want to keep the resource, and pass it
# back to our caller
stack.pop_all()
def __enter__(self):
resource = self.acquire_resource()
with self._cleanup_on_error():
if not self.check_resource_ok(resource):
msg = "Failed validation for {!r}"
raise RuntimeError(msg.format(resource))
return resource
def __exit__(self, *exc_details):
# We don't need to duplicate any of our resource release logic
self.release_resource()
class BaseScraper:
login_url = ""
login_data = dict() # dict of key, value pairs to fill the login form
loop = asyncio.get_event_loop()
def __init__(self, urls):
self.urls = urls
self.acquire_resource = self.set_session
self.release_resource = self.close_session
async def _set_session(self):
self.session = await aiohttp.ClientSession().__aenter__()
def set_session(self):
set_session_attr = self.loop.create_task(self._set_session())
self.loop.run_until_complete(set_session_attr)
return self # variable after "as" becomes instance of BaseScraper
async def _close_session(self):
await self.session.__aexit__(None, None, None)
def close_session(self):
close_session = self.loop.create_task(self._close_session())
self.loop.run_until_complete(close_session)
def __call__(self):
fetch_urls = self.loop.create_task(self._fetch())
return self.loop.run_until_complete(fetch_urls)
async def _get(self, url):
async with self.session.get(url) as response:
result = await response.read()
return url, result
async def _fetch(self):
tasks = (self.loop.create_task(self._get(url)) for url in self.urls)
start = time()
results = await asyncio.gather(*tasks)
print(
"time elapsed: {} seconds \nurls count: {}".format(
time() - start, len(urls)
)
)
return results
@property
def form(self):
"""Create and return form for authentication."""
form = aiohttp.FormData(self.login_data)
get_login_page = self.loop.create_task(self._get(self.login_url))
url, login_page = self.loop.run_until_complete(get_login_page)
login_html = lxml.html.fromstring(login_page)
hidden_inputs = login_html.xpath(r'//form//input[@type="hidden"]')
login_form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
for key, value in login_form.items():
form.add_field(key, value)
return form
async def _login(self, form):
async with self.session.post(self.login_url, data=form) as response:
if response.status != 200:
response.raise_for_status()
print("logged into {}".format(url))
await response.release()
def login(self):
post_login_form = self.loop.create_task(self._login(self.form))
self.loop.run_until_complete(post_login_form)
if __name__ == "__main__":
urls = ("http://example.com",) * 10
base_scraper = BaseScraper(urls)
with ResourceManager(base_scraper) as scraper:
for url, html in scraper():
print(url, len(html))
aiohttp 中似乎没有会话池。
// 只是 post 一些官方文档。
持久会话
这里是persistent-session
官方网站的使用演示
https://docs.aiohttp.org/en/latest/client_advanced.html#persistent-session
app.cleanup_ctx.append(persistent_session)
async def persistent_session(app):
app['PERSISTENT_SESSION'] = session = aiohttp.ClientSession()
yield
await session.close()
async def my_request_handler(request):
session = request.app['PERSISTENT_SESSION']
async with session.get("http://python.org") as resp:
print(resp.status)
//TODO:完整的可运行演示代码
连接池
它有一个连接池:
https://docs.aiohttp.org/en/latest/client_advanced.html#connectors
conn = aiohttp.TCPConnector()
#conn = aiohttp.TCPConnector(limit=30)
#conn = aiohttp.TCPConnector(limit=0) # nolimit, default is 100.
#conn = aiohttp.TCPConnector(limit_per_host=30) # default is 0
session = aiohttp.ClientSession(connector=conn)