同一个进程中有多个 aiohttp Application() 的 运行?

Multiple aiohttp Application()'s running in the same process?

两个 aiohttp.web.Application() 对象可以 运行 在同一个进程中,例如在不同的端口上?

我看到了一堆 aiohttp 代码示例,例如:

from aiohttp import web
app = web.Application()
app.router.add_get('/foo', foo_view, name='foo')
web.run_app(app, host='0.0.0.0', port=10000)

我想知道是否有一些等效项可以将多个 web.Applications() 同时配置为 运行。类似于:

from aiohttp import web
app1 = web.Application()
app1.router.add_get('/foo', foo_view, name='foo')
app2 = web.Application()
app2.router.add_get('/bar', bar_view, name='bar')
# This is the wishful thinking code:
web.configure_app(app1, host='0.0.0.0', port=10000)
web.configure_app(app2, host='0.0.0.0', port=10001)
web.run_apps()

我的用例是我有一个现有的 python 网络框架来做这种事情,我正在构建一个类似于 python 3.6 和 aiohttp 的原型。

我知道多个 python 服务器可以 运行 nginx(另见 http://aiohttp.readthedocs.io/en/stable/deployment.html);那不是我想要的。我想探索两个具有相同异步事件循环的 aiohttp 网络服务器的可能性,运行在同一个 python 进程中,服务于两个不同的端口。

是的,您可以 - 只需编写一些包装器并重新实现 run_app

这是一个简单的例子。 run_app 的所有特定于应用程序的部分都移到了专用的 class AppWrapperMultiApp 只负责初始化所有配置的应用程序,保持运行 循环并清理。

import asyncio
from aiohttp import web


class AppWrapper:

    def __init__(self, aioapp, port, loop):
        self.port = port
        self.aioapp = aioapp
        self.loop = loop
        self.uris = []
        self.servers = []

    def initialize(self):
        self.loop.run_until_complete(self.aioapp.startup())
        handler = self.aioapp.make_handler(loop=self.loop)

        server_creations, self.uris = web._make_server_creators(
            handler, loop=self.loop, ssl_context=None,
            host=None, port=self.port, path=None, sock=None,
            backlog=128)

        self.servers = self.loop.run_until_complete(
            asyncio.gather(*server_creations, loop=self.loop)
        )

    def shutdown(self):
        server_closures = []
        for srv in self.servers:
            srv.close()
            server_closures.append(srv.wait_closed())
        self.loop.run_until_complete(
            asyncio.gather(*server_closures, loop=self.loop))

        self.loop.run_until_complete(self.aioapp.shutdown())

    def cleanup(self):
         self.loop.run_until_complete(self.aioapp.cleanup())

    def show_info(self):
        print("======== Running on {} ========\n".format(', '.join(self.uris)))


class MultiApp:    

    def __init__(self, loop=None):
        self._apps = []
        self.user_supplied_loop = loop is not None
        if loop is None:
            self.loop = asyncio.get_event_loop()
        else:
            self.loop = loop

    def configure_app(self, app, port):
        app._set_loop(self.loop)
        self._apps.append(
            AppWrapper(app, port, self.loop)
        )

    def run_all(self):
        try:
            for app in self._apps:
                app.initialize()
            try:
                for app in self._apps:
                    app.show_info()
                print("(Press CTRL+C to quit)")
                self.loop.run_forever()
            except KeyboardInterrupt:  # pragma: no cover
                pass
            finally:
                for app in self._apps:
                    app.shutdown()
        finally:
            for app in self._apps:
                app.cleanup()

        if not self.user_supplied_loop:
            self.loop.close()

注意:注意内部aiohttp方法的使用,可能会发生变化。

现在让我们使用它:

from aiohttp import web

async def handle1(request):
    return web.Response(text='SERVER 1')


async def handle2(request):
    return web.Response(text='SERVER 2')

app1 = web.Application()
app1.router.add_get('/', handle1)

app2 = web.Application()
app2.router.add_get('/', handle2)

ma = MultiApp()
ma.configure_app(app1, port=8081)
ma.configure_app(app2, port=8071)
ma.run_all()

作为旁注,再想想你为什么需要这个。在几乎所有情况下,去耦都是更好的选择。在同一进程中设置多个端点会使它们相互依赖。我想到了一个案例并且有 "good" 推理,内部统计/调试端点。

虽然上面的答案已经被采纳,这里还有一个方法:

创建 test.py:

from aiohttp import web
import asyncio
import sys

@asyncio.coroutine
def status1(request):
    return web.json_response('App1 OK')

@asyncio.coroutine
def status2(request):
    return web.json_response('App2 OK')

def start():
    try:
        loop = asyncio.get_event_loop()

        # App1
        app1 = web.Application()
        app1.router.add_get('/status', status1)
        handler1 = app1.make_handler()
        coroutine1 = loop.create_server(handler1, '0.0.0.0', 8081)
        server1 = loop.run_until_complete(coroutine1)
        address1, port1 = server1.sockets[0].getsockname()
        print('App1 started on http://{}:{}'.format(address1, port1))

        # App2
        app2 = web.Application()
        app2.router.add_get('/status', status2)
        handler2 = app2.make_handler()
        coroutine2 = loop.create_server(handler2, '0.0.0.0', 8082)
        server2 = loop.run_until_complete(coroutine2)
        address2, port2 = server2.sockets[0].getsockname()
        print('App2 started on http://{}:{}'.format(address2, port2))

        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            server1.close()
            loop.run_until_complete(app1.shutdown())
            loop.run_until_complete(handler1.shutdown(60.0))
            loop.run_until_complete(handler1.finish_connections(1.0))
            loop.run_until_complete(app1.cleanup())

            server2.close()
            loop.run_until_complete(app2.shutdown())
            loop.run_until_complete(handler2.shutdown(60.0))
            loop.run_until_complete(handler2.finish_connections(1.0))
            loop.run_until_complete(app2.cleanup())

        loop.close()
    except Exception as e:
        sys.stderr.write('Error: ' + format(str(e)) + "\n")
        sys.exit(1)

if __name__ == '__main__':
    start()

在终端,打开两个选项卡。在一个选项卡中,运行

python test.py

在其他选项卡中,运行

curl -X GET http://localhost:8081/status
curl -X GET http://localhost:8082/status

您会收到回复

"App1 OK"
"App2 OK"

看起来 3.0 版增加了一种以前不可用的更好的方法:https://aiohttp.readthedocs.io/en/stable/web_advanced.html#aiohttp-web-app-runners

编辑

文档(正如所指出的)有点不清楚(我也不得不自己与它作斗争)。要 运行 多个端口上的多个服务器,只需为每个站点重复文档中的代码。简而言之,您需要为每个您想要 [=28] 的 Application/Server 创建一个应用程序、AppRunner(并将其称为 setup())和一个 TCPSite(并将其称为 start()) =] 在单独的 address/port.

最简单的方法是为重复的站点设置创建一个异步函数,然后可以将它传递给您的应用程序实例和端口。我还包括了提到的 运行ners 循环退出时的清理。

希望这对您有所帮助。

import asyncio
from aiohttp import web

runners = []

async def start_site(app, address='localhost', port=8080):
    runner = web.AppRunner(app)
    runners.append(runner)
    await runner.setup()
    site = web.TCPSite(runner, address, port)
    await site.start()

loop = asyncio.get_event_loop()

loop.create_task(start_site(web.Application()))
loop.create_task(start_site(web.Application(), port=8081))
loop.create_task(start_site(web.Application(), port=8082))

try:
    loop.run_forever()
except:
    pass
finally:
    for runner in runners:
        loop.run_until_complete(runner.cleanup())