如何重用aiohttp ClientSession池?

How to reuse aiohttp ClientSession pool?

文档说要重用 ClientSession:

Don’t create a session per request. Most likely you need a session per application which performs all requests altogether.

A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.1

但是文档中似乎没有任何关于如何做到这一点的解释?有一个可能相关的示例,但它没有说明如何在别处重用池:http://aiohttp.readthedocs.io/en/stable/client.html#keep-alive-connection-pooling-and-cookie-sharing

这样的事情是正确的做法吗?

@app.listener('before_server_start')
async def before_server_start(app, loop):
    app.pg_pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
    app.http_session_pool = aiohttp.ClientSession()


@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    app.http_session_pool.close()
    app.pg_pool.close()


@app.post("/api/register")
async def register(request):
    # json validation
    async with app.pg_pool.acquire() as pg:
        await pg.execute()  # create unactivated user in db
        async with app.http_session_pool as session:
            # TODO send activation email using SES API
            async with session.post('http://httpbin.org/post', data=b'data') as resp:
                print(resp.status)
                print(await resp.text())
        return HTTPResponse(status=204)

我认为可以改进的地方很少:

1)

ClientSession 的实例是一个会话对象。此会话包含连接池,但它不是 "session_pool" 本身。我建议将 http_session_pool 重命名为 http_session 或者 client_session.

2)

会话的 close() 方法 is a corountine。你应该等待它:

await app.client_session.close()

甚至更好(恕我直言),而不是考虑如何正确 open/close 会话使用标准异步上下文管理器等待 __aenter__ / __aexit__:

@app.listener('before_server_start')
async def before_server_start(app, loop):
    # ...
    app.client_session = await aiohttp.ClientSession().__aenter__()


@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    await app.client_session.__aexit__(None, None, None)
    # ...

3)

关注this info:

However, if the event loop is stopped before the underlying connection is closed, an ResourceWarning: unclosed transport warning is emitted (when warnings are enabled).

To avoid this situation, a small delay must be added before closing the event loop to allow any open underlying connections to close.

我不确定在你的情况下它是强制性的,但在 after_server_stop 中添加 await asyncio.sleep(0) 作为文档建议没有什么不好:

@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    # ...
    await asyncio.sleep(0)  # http://aiohttp.readthedocs.io/en/stable/client.html#graceful-shutdown

更新:

实现 __aenter__ / __aexit__

Class 可以用作 async context manager (可以在 async with 语句中使用)。它允许在执行内部块之前和之后执行一些操作。这与常规上下文管理器非常相似,但 asyncio 相关。与常规上下文管理器异步一样,可以直接使用(无需 async with)手动等待 __aenter__ / __aexit__.

为什么我认为使用 __aenter__ / __aexit__ 手动 create/free 会话比使用 close() 更好?因为我们不应该担心 __aenter__ / __aexit__ 中实际发生的事情。想象一下,在 aiohttp 的未来版本中,会话的创建将随着等待 open() 的需要而改变。如果您将使用 __aenter__ / __aexit__,则无需以某种方式更改您的代码。

我在 Google 上搜索有关如何在我的代码触发此警告消息后重用 aiohttp ClientSession 实例后发现了这个问题:UserWarning:在协程之外创建客户端会话是一个非常危险的想法

这段代码虽然有关联,但可能无法解决上述问题。我是 asyncio 和 aiohttp 的新手,所以这可能不是最佳实践。这是我在阅读了很多看似矛盾的信息后能想到的最好的。

我创建了一个 class ResourceManager,取自 Python 打开上下文的文档。

ResourceManager 实例通过魔术方法 __aenter____aexit__ 以及 BaseScraper.set_session 和 BaseScraper.close_session 包装器方法处理 aiohttp ClientSession 实例的打开和关闭。

我能够通过以下代码重用 ClientSession 实例。

BaseScraper class 也有验证方法。依赖lxml第三方包

import asyncio
from time import time
from contextlib import contextmanager, AbstractContextManager, ExitStack

import aiohttp
import lxml.html


class ResourceManager(AbstractContextManager):
    # Code taken from Python docs: 29.6.2.4. of https://docs.python.org/3.6/library/contextlib.html

    def __init__(self, scraper, check_resource_ok=None):
        self.acquire_resource = scraper.acquire_resource
        self.release_resource = scraper.release_resource
        if check_resource_ok is None:

            def check_resource_ok(resource):
                return True

        self.check_resource_ok = check_resource_ok

    @contextmanager
    def _cleanup_on_error(self):
        with ExitStack() as stack:
            stack.push(self)
            yield
            # The validation check passed and didn't raise an exception
            # Accordingly, we want to keep the resource, and pass it
            # back to our caller
            stack.pop_all()

    def __enter__(self):
        resource = self.acquire_resource()
        with self._cleanup_on_error():
            if not self.check_resource_ok(resource):
                msg = "Failed validation for {!r}"
                raise RuntimeError(msg.format(resource))
        return resource

    def __exit__(self, *exc_details):
        # We don't need to duplicate any of our resource release logic
        self.release_resource()


class BaseScraper:
    login_url = ""
    login_data = dict()  # dict of key, value pairs to fill the login form
    loop = asyncio.get_event_loop()

    def __init__(self, urls):
        self.urls = urls
        self.acquire_resource = self.set_session
        self.release_resource = self.close_session

    async def _set_session(self):
        self.session = await aiohttp.ClientSession().__aenter__()

    def set_session(self):
        set_session_attr = self.loop.create_task(self._set_session())
        self.loop.run_until_complete(set_session_attr)
        return self  # variable after "as" becomes instance of BaseScraper

    async def _close_session(self):
        await self.session.__aexit__(None, None, None)

    def close_session(self):
        close_session = self.loop.create_task(self._close_session())
        self.loop.run_until_complete(close_session)

    def __call__(self):
        fetch_urls = self.loop.create_task(self._fetch())
        return self.loop.run_until_complete(fetch_urls)

    async def _get(self, url):
        async with self.session.get(url) as response:
            result = await response.read()
        return url, result

    async def _fetch(self):
        tasks = (self.loop.create_task(self._get(url)) for url in self.urls)
        start = time()
        results = await asyncio.gather(*tasks)
        print(
            "time elapsed: {} seconds \nurls count: {}".format(
                time() - start, len(urls)
            )
        )
        return results

    @property
    def form(self):
        """Create and return form for authentication."""
        form = aiohttp.FormData(self.login_data)
        get_login_page = self.loop.create_task(self._get(self.login_url))
        url, login_page = self.loop.run_until_complete(get_login_page)

        login_html = lxml.html.fromstring(login_page)
        hidden_inputs = login_html.xpath(r'//form//input[@type="hidden"]')
        login_form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
        for key, value in login_form.items():
            form.add_field(key, value)
        return form

    async def _login(self, form):
        async with self.session.post(self.login_url, data=form) as response:
            if response.status != 200:
                response.raise_for_status()
            print("logged into {}".format(url))
            await response.release()

    def login(self):
        post_login_form = self.loop.create_task(self._login(self.form))
        self.loop.run_until_complete(post_login_form)


if __name__ == "__main__":
    urls = ("http://example.com",) * 10
    base_scraper = BaseScraper(urls)
    with ResourceManager(base_scraper) as scraper:
        for url, html in scraper():
            print(url, len(html))

aiohttp 中似乎没有会话池。
// 只是 post 一些官方文档。


持久会话

这里是persistent-session官方网站的使用演示
https://docs.aiohttp.org/en/latest/client_advanced.html#persistent-session

app.cleanup_ctx.append(persistent_session)

async def persistent_session(app):
   app['PERSISTENT_SESSION'] = session = aiohttp.ClientSession()
   yield
   await session.close()

async def my_request_handler(request):
   session = request.app['PERSISTENT_SESSION']
   async with session.get("http://python.org") as resp:
       print(resp.status)

//TODO:完整的可运行演示代码

连接池

它有一个连接池:
https://docs.aiohttp.org/en/latest/client_advanced.html#connectors

conn = aiohttp.TCPConnector()
#conn = aiohttp.TCPConnector(limit=30)
#conn = aiohttp.TCPConnector(limit=0)  # nolimit, default is 100.
#conn = aiohttp.TCPConnector(limit_per_host=30) # default is 0

session = aiohttp.ClientSession(connector=conn)