python aiohttp 表现得像单线程应用程序
python aiohttp behaving like single threaded application
我有以下简单的 python Web 服务器应用程序。当我触发 /sleep 呼叫时 - 直到睡眠时间结束和响应 returns - 所有其他 /quick 上的呼叫都被阻止。我不确定这段代码有什么问题。有人可以澄清一下吗?
from aiohttp import web
import asyncio
import time
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def sleephandle(request):
name = request.match_info.get('name', "Anonymous")
time.sleep(12) // trivializing here; actual code has a transition from async to sync
text = "Hello, " + name
return web.Response(text=text)
async def init(loop):
app = web.Application(loop=loop)
app.router.add_get('/quick', handle)
app.router.add_get('/sleep', sleephandle)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('server started')
return srv
def create_server():
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
create_server()
我的解决方案的关键思想是使用 loop.run_in_executor
和适合您的情况的 Pool。您可以通过以下方式解决问题:
from aiohttp import web
import asyncio
import time
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
def blocking_code():
"""Some long running code"""
time.sleep(12)
return "!!!!"
async def blocking_code_task(loop: asyncio.BaseEventLoop, request: web.Request):
"""Wrapper to be used in asyncio Task"""
r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
logging.info(f"{datetime.now()}: {r}")
async def handle(request: web.Request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def sleephandle(request: web.Request):
"""We wait fore results here, then send response"""
name = request.match_info.get('name', "Anonymous")
loop = asyncio.get_event_loop()
# if you want to wait for result
r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
text = "Hello, " + name + r
return web.Response(text=text)
async def fast_sleep_answer(request: web.Request):
"""We send response as fast as possible and do all work in another asyncio Task"""
name = request.match_info.get('name', "Anonymous")
loop = asyncio.get_event_loop()
# if you do not want to want for result
asyncio.create_task(blocking_code_task(loop, request))
text = "Fast answer" + name
return web.Response(text=text)
async def on_shutdown(app):
"""Do not forget to correctly close ThreadPool"""
app["workers_pool"].shutdown()
logging.info(f"{datetime.now()}: Pool is closed")
async def init(args=None):
"""Changed your code for newer aiohttp"""
pool = ThreadPoolExecutor(8)
app = web.Application()
app.router.add_get('/quick', handle)
app.router.add_get('/sleep', sleephandle)
app.router.add_get('/fast', fast_sleep_answer)
app["workers_pool"] = pool # can be ThreadPool or ProcessPool
app.on_shutdown.append(on_shutdown) # close the pool when app closes
return app
# the better way to tun app
# name of file is x.py
# in Linux command will be python3
# python -m aiohttp.web -H 0.0.0.0 -P 8080 x:init
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
web.run_app(init(), host="0.0.0.0", port=8080)
所有阻塞 IN/OUT 操作都是在 ThreadPoolExecutor 中进行的。如果您的任务是 CPU 绑定的,请使用 ProcessPoolExecutor。我展示了两种情况:1) 当你不能尽快回答并需要等待结果时 2) 当你可以回答然后在后台进行所有工作时。
我有以下简单的 python Web 服务器应用程序。当我触发 /sleep 呼叫时 - 直到睡眠时间结束和响应 returns - 所有其他 /quick 上的呼叫都被阻止。我不确定这段代码有什么问题。有人可以澄清一下吗?
from aiohttp import web
import asyncio
import time
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def sleephandle(request):
name = request.match_info.get('name', "Anonymous")
time.sleep(12) // trivializing here; actual code has a transition from async to sync
text = "Hello, " + name
return web.Response(text=text)
async def init(loop):
app = web.Application(loop=loop)
app.router.add_get('/quick', handle)
app.router.add_get('/sleep', sleephandle)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('server started')
return srv
def create_server():
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
create_server()
我的解决方案的关键思想是使用 loop.run_in_executor
和适合您的情况的 Pool。您可以通过以下方式解决问题:
from aiohttp import web
import asyncio
import time
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
def blocking_code():
"""Some long running code"""
time.sleep(12)
return "!!!!"
async def blocking_code_task(loop: asyncio.BaseEventLoop, request: web.Request):
"""Wrapper to be used in asyncio Task"""
r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
logging.info(f"{datetime.now()}: {r}")
async def handle(request: web.Request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
async def sleephandle(request: web.Request):
"""We wait fore results here, then send response"""
name = request.match_info.get('name', "Anonymous")
loop = asyncio.get_event_loop()
# if you want to wait for result
r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
text = "Hello, " + name + r
return web.Response(text=text)
async def fast_sleep_answer(request: web.Request):
"""We send response as fast as possible and do all work in another asyncio Task"""
name = request.match_info.get('name', "Anonymous")
loop = asyncio.get_event_loop()
# if you do not want to want for result
asyncio.create_task(blocking_code_task(loop, request))
text = "Fast answer" + name
return web.Response(text=text)
async def on_shutdown(app):
"""Do not forget to correctly close ThreadPool"""
app["workers_pool"].shutdown()
logging.info(f"{datetime.now()}: Pool is closed")
async def init(args=None):
"""Changed your code for newer aiohttp"""
pool = ThreadPoolExecutor(8)
app = web.Application()
app.router.add_get('/quick', handle)
app.router.add_get('/sleep', sleephandle)
app.router.add_get('/fast', fast_sleep_answer)
app["workers_pool"] = pool # can be ThreadPool or ProcessPool
app.on_shutdown.append(on_shutdown) # close the pool when app closes
return app
# the better way to tun app
# name of file is x.py
# in Linux command will be python3
# python -m aiohttp.web -H 0.0.0.0 -P 8080 x:init
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
web.run_app(init(), host="0.0.0.0", port=8080)
所有阻塞 IN/OUT 操作都是在 ThreadPoolExecutor 中进行的。如果您的任务是 CPU 绑定的,请使用 ProcessPoolExecutor。我展示了两种情况:1) 当你不能尽快回答并需要等待结果时 2) 当你可以回答然后在后台进行所有工作时。