"RuntimeError: This event loop is already running" when combine asyncio and aiohttp

"RuntimeError: This event loop is already running" when combine asyncio and aiohttp

不同于,我的场景是下一个:

import asyncio

async def action():
    print("action")
    # use sleep to simulate real operation
    # it's a daemon works endless there
    await asyncio.sleep(10000)

async def main():
    await action()

if __name__ == '__main__':
    asyncio.run(main())
import asyncio
from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

async def start_api_server():
    print("api")
    app = web.Application()
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])
    web.run_app(app)

async def action():
    print("action")
    await asyncio.sleep(10000)

async def main():
    #await action()
    await(asyncio.gather(start_api_server(), action()))

if __name__ == '__main__':
    asyncio.run(main())

但是,上面报告下一个堆栈:

Traceback (most recent call last):
  File "svr.py", line 26, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "svr.py", line 23, in main
    await(asyncio.gather(start_api_server(), action()))
  File "svr.py", line 15, in start_api_server
    web.run_app(app)
  File "/home/cake/venv/lib/python3.7/site-packages/aiohttp/web.py", line 512, in run_app
    _cancel_tasks({main_task}, loop)
  File "/home/cake/venv/lib/python3.7/site-packages/aiohttp/web.py", line 444, in _cancel_tasks
    asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
  File "/usr/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.7/asyncio/base_events.py", line 521, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

问题:

看起来我们不能做上面的组合,所以我想知道在异步程序中添加休息 api 的正确方法是什么?

注意: 如果可能,我想避免 运行 另一个线程来启动 Web 服务器,除非它完全不可能。

我找到了答案,来自aiohttp source code

  • web.run_app(app) 如下:
async def _run_app(......):

    runner = AppRunner(
        app,
        handle_signals=handle_signals,
        access_log_class=access_log_class,
        access_log_format=access_log_format,
        access_log=access_log,
    )

    await runner.setup()
    ......
    while True:
        await asyncio.sleep(delay)

def run_app(......):

    loop = asyncio.get_event_loop()
    try:
        main_task = loop.create_task(
            _run_app(
                app,
                ......
            )
        )
        loop.run_until_complete(main_task)

但是它会直接调用loop.run_until_complete,这使得我们无法在主程序中启动我们自己的asyncio.run()使用的事件循环。

  • 为了克服这个问题,我们应该使用一些低级函数作为下一个:
import asyncio
from aiohttp import web


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

async def start_api_server():
    print("api")
    loop = asyncio.get_event_loop()
    app = web.Application()
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])
    #web.run_app(app)
    runner = web.AppRunner(app)
    await runner.setup()
    await loop.create_server(runner.server, '127.0.0.1', 8080)
    print('Server started at http://127.0.0.1:8080...')

async def action():
    print("action")
    await asyncio.sleep(10000)

async def main():
    #await action()
    await(asyncio.gather(start_api_server(), action()))

if __name__ == '__main__':
    asyncio.run(main())

以上代码重现web.run_app中的runner setup,并将AppRunner中的属性server传给asyncio.create_server新建一个aiohttp服务器。然后可以重用asyncio.run中的事件循环来满足我的要求。