python aiohttp 表现得像单线程应用程序

python aiohttp behaving like single threaded application

我有以下简单的 python Web 服务器应用程序。当我触发 /sleep 呼叫时 - 直到睡眠时间结束和响应 returns - 所有其他 /quick 上的呼叫都被阻止。我不确定这段代码有什么问题。有人可以澄清一下吗?

from aiohttp import web
import asyncio
import time

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

async def sleephandle(request):
    name = request.match_info.get('name', "Anonymous")
    time.sleep(12) // trivializing here; actual code has a transition from async to sync 
    text = "Hello, " + name
    return web.Response(text=text)

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_get('/quick', handle)
    app.router.add_get('/sleep', sleephandle)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('server started')
    return srv

def create_server():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()

create_server()

我的解决方案的关键思想是使用 loop.run_in_executor 和适合您的情况的 Pool。您可以通过以下方式解决问题:

from aiohttp import web
import asyncio
import time
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime


def blocking_code():
    """Some long running code"""
    time.sleep(12)
    return "!!!!"


async def blocking_code_task(loop: asyncio.BaseEventLoop, request: web.Request):
    """Wrapper to be used in asyncio Task"""
    r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
    logging.info(f"{datetime.now()}: {r}")


async def handle(request: web.Request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)


async def sleephandle(request: web.Request):
    """We wait fore results here, then send response"""
    name = request.match_info.get('name', "Anonymous")
    loop = asyncio.get_event_loop()
    # if you want to wait for result
    r = await loop.run_in_executor(executor=request.app["workers_pool"], func=blocking_code)
    text = "Hello, " + name + r
    return web.Response(text=text)


async def fast_sleep_answer(request: web.Request):
    """We send response as fast as possible and do all work in another asyncio Task"""
    name = request.match_info.get('name', "Anonymous")
    loop = asyncio.get_event_loop()
    # if you do not want to want for result
    asyncio.create_task(blocking_code_task(loop, request))
    text = "Fast answer" + name
    return web.Response(text=text)


async def on_shutdown(app):
    """Do not forget to correctly close ThreadPool"""
    app["workers_pool"].shutdown()
    logging.info(f"{datetime.now()}: Pool is closed")


async def init(args=None):
    """Changed your code for newer aiohttp"""
    pool = ThreadPoolExecutor(8)
    app = web.Application()
    app.router.add_get('/quick', handle)
    app.router.add_get('/sleep', sleephandle)
    app.router.add_get('/fast', fast_sleep_answer)
    app["workers_pool"] = pool  # can be ThreadPool or ProcessPool
    app.on_shutdown.append(on_shutdown)  # close the pool when app closes
    return app

# the better way to tun app
# name of file is x.py
# in Linux command will be python3
# python -m aiohttp.web -H 0.0.0.0 -P 8080 x:init
if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    web.run_app(init(), host="0.0.0.0", port=8080)

所有阻塞 IN/OUT 操作都是在 ThreadPoolExecutor 中进行的。如果您的任务是 CPU 绑定的,请使用 ProcessPoolExecutor。我展示了两种情况:1) 当你不能尽快回答并需要等待结果时 2) 当你可以回答然后在后台进行所有工作时。