"RuntimeError: This event loop is already running" when combine asyncio and aiohttp
"RuntimeError: This event loop is already running" when combine asyncio and aiohttp
不同于,我的场景是下一个:
- 我已经有一个运行良好的 asyncio 程序,如下所示:
import asyncio
async def action():
print("action")
# use sleep to simulate real operation
# it's a daemon works endless there
await asyncio.sleep(10000)
async def main():
await action()
if __name__ == '__main__':
asyncio.run(main())
- 然后,我想为我的程序添加一个休息api。这样,客户端可以使用 api 调用查询我的程序的某些状态(例如,获取我的异步程序逻辑的一些内部状态变量):
import asyncio
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def start_api_server():
print("api")
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
web.run_app(app)
async def action():
print("action")
await asyncio.sleep(10000)
async def main():
#await action()
await(asyncio.gather(start_api_server(), action()))
if __name__ == '__main__':
asyncio.run(main())
但是,上面报告下一个堆栈:
Traceback (most recent call last):
File "svr.py", line 26, in <module>
asyncio.run(main())
File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "svr.py", line 23, in main
await(asyncio.gather(start_api_server(), action()))
File "svr.py", line 15, in start_api_server
web.run_app(app)
File "/home/cake/venv/lib/python3.7/site-packages/aiohttp/web.py", line 512, in run_app
_cancel_tasks({main_task}, loop)
File "/home/cake/venv/lib/python3.7/site-packages/aiohttp/web.py", line 444, in _cancel_tasks
asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
File "/usr/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete
self.run_forever()
File "/usr/lib/python3.7/asyncio/base_events.py", line 521, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
问题:
看起来我们不能做上面的组合,所以我想知道在异步程序中添加休息 api 的正确方法是什么?
注意: 如果可能,我想避免 运行 另一个线程来启动 Web 服务器,除非它完全不可能。
我找到了答案,来自aiohttp source code:
- web.run_app(app) 如下:
async def _run_app(......):
runner = AppRunner(
app,
handle_signals=handle_signals,
access_log_class=access_log_class,
access_log_format=access_log_format,
access_log=access_log,
)
await runner.setup()
......
while True:
await asyncio.sleep(delay)
def run_app(......):
loop = asyncio.get_event_loop()
try:
main_task = loop.create_task(
_run_app(
app,
......
)
)
loop.run_until_complete(main_task)
但是它会直接调用loop.run_until_complete
,这使得我们无法在主程序中启动我们自己的asyncio.run()
使用的事件循环。
- 为了克服这个问题,我们应该使用一些低级函数作为下一个:
import asyncio
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def start_api_server():
print("api")
loop = asyncio.get_event_loop()
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
#web.run_app(app)
runner = web.AppRunner(app)
await runner.setup()
await loop.create_server(runner.server, '127.0.0.1', 8080)
print('Server started at http://127.0.0.1:8080...')
async def action():
print("action")
await asyncio.sleep(10000)
async def main():
#await action()
await(asyncio.gather(start_api_server(), action()))
if __name__ == '__main__':
asyncio.run(main())
以上代码重现web.run_app
中的runner setup
,并将AppRunner
中的属性server
传给asyncio.create_server
新建一个aiohttp服务器。然后可以重用asyncio.run
中的事件循环来满足我的要求。
不同于
- 我已经有一个运行良好的 asyncio 程序,如下所示:
import asyncio
async def action():
print("action")
# use sleep to simulate real operation
# it's a daemon works endless there
await asyncio.sleep(10000)
async def main():
await action()
if __name__ == '__main__':
asyncio.run(main())
- 然后,我想为我的程序添加一个休息api。这样,客户端可以使用 api 调用查询我的程序的某些状态(例如,获取我的异步程序逻辑的一些内部状态变量):
import asyncio
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def start_api_server():
print("api")
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
web.run_app(app)
async def action():
print("action")
await asyncio.sleep(10000)
async def main():
#await action()
await(asyncio.gather(start_api_server(), action()))
if __name__ == '__main__':
asyncio.run(main())
但是,上面报告下一个堆栈:
Traceback (most recent call last):
File "svr.py", line 26, in <module>
asyncio.run(main())
File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "svr.py", line 23, in main
await(asyncio.gather(start_api_server(), action()))
File "svr.py", line 15, in start_api_server
web.run_app(app)
File "/home/cake/venv/lib/python3.7/site-packages/aiohttp/web.py", line 512, in run_app
_cancel_tasks({main_task}, loop)
File "/home/cake/venv/lib/python3.7/site-packages/aiohttp/web.py", line 444, in _cancel_tasks
asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
File "/usr/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete
self.run_forever()
File "/usr/lib/python3.7/asyncio/base_events.py", line 521, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
问题:
看起来我们不能做上面的组合,所以我想知道在异步程序中添加休息 api 的正确方法是什么?
注意: 如果可能,我想避免 运行 另一个线程来启动 Web 服务器,除非它完全不可能。
我找到了答案,来自aiohttp source code:
- web.run_app(app) 如下:
async def _run_app(......):
runner = AppRunner(
app,
handle_signals=handle_signals,
access_log_class=access_log_class,
access_log_format=access_log_format,
access_log=access_log,
)
await runner.setup()
......
while True:
await asyncio.sleep(delay)
def run_app(......):
loop = asyncio.get_event_loop()
try:
main_task = loop.create_task(
_run_app(
app,
......
)
)
loop.run_until_complete(main_task)
但是它会直接调用loop.run_until_complete
,这使得我们无法在主程序中启动我们自己的asyncio.run()
使用的事件循环。
- 为了克服这个问题,我们应该使用一些低级函数作为下一个:
import asyncio
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def start_api_server():
print("api")
loop = asyncio.get_event_loop()
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
#web.run_app(app)
runner = web.AppRunner(app)
await runner.setup()
await loop.create_server(runner.server, '127.0.0.1', 8080)
print('Server started at http://127.0.0.1:8080...')
async def action():
print("action")
await asyncio.sleep(10000)
async def main():
#await action()
await(asyncio.gather(start_api_server(), action()))
if __name__ == '__main__':
asyncio.run(main())
以上代码重现web.run_app
中的runner setup
,并将AppRunner
中的属性server
传给asyncio.create_server
新建一个aiohttp服务器。然后可以重用asyncio.run
中的事件循环来满足我的要求。