为什么 asyncio.sleep 在与 aiohttp websockets 一起使用时将整个任务(内部有 websocket)冻结在 运行 状态?

Why asyncio.sleep freezes whole Task (which has websocket inside) in running state when used with aiohttp websockets?

今天我发现 asyncioaiohttp 有一个很奇怪的问题。

我编写了非常简单的使用 Websockets 的服务器和客户端。当服务器从客户端获得连接时,它创建两个任务,一个任务从客户端监听数据,另一个任务向客户端发送数据。

如果客户端决定结束会话,它会向服务器发送关闭消息,listen_on_socket(服务器)任务完成正常,但是 send_to_socket(服务器)任务如果包含 asyncio.sleep 则被冻结在任务里面。我什至无法取消冻结的任务。

问题的原因是什么,我该如何处理?

我有以下 aiohttp 服务器代码作为示例:

from aiohttp import web, WSMsgType
import asyncio


async def send_to_socket(ws: web.WebSocketResponse):
    """helper func which send messages to socket"""
    for i in range(10):
        try:
            if ws.closed:
                break
            else:
                await ws.send_str(f"I am super socket server-{i} !!!")
        except Exception as ex:
            print(ex)
            break
        # remove await asyncio.sleep(0.5) and it works !
        print("| send_to_socket | St sleeping")
        await asyncio.sleep(0.5)
        print("| send_to_socket | Stopped sleeping")  # you will not get the message
    if not ws.closed:
        await ws.send_str("close")

    print("| send_to_socket | Finished sending")


async def listen_on_socket(ws: web.WebSocketResponse, send_task: asyncio.Task):
    """helper func which Listen messages to socket"""
    async for msg in ws:
        if msg.type == WSMsgType.TEXT:
            if msg.data == "close":
                await ws.close()
                send_task.cancel()
                print(send_task.cancelled(), send_task.done(), send_task)
                break
        elif msg.type == WSMsgType.ERROR:
            print(f'ws connection closed with exception {ws.exception()}')
    print("* listen_on_socket * Finished listening")


async def websocket_handler(req:  web.Request) -> web.WebSocketResponse:
    """Socket aiohttp handler"""
    ws = web.WebSocketResponse()
    print(f"Handler | Started websocket: {id(ws)}")
    await ws.prepare(req)
    t = asyncio.create_task(send_to_socket(ws))
    await asyncio.gather(listen_on_socket(ws, t), t)
    print("Handler | websocket connection closed")
    return ws

if __name__ == '__main__':
    app = web.Application()
    app.router.add_get("/socket", websocket_handler)
    web.run_app(app, host="0.0.0.0", port=9999)

我有以下 aiohttp 客户端代码作为示例:

from aiohttp import ClientSession
import aiohttp
import asyncio


async def client():
    n = 3
    async with ClientSession() as session:
        async with session.ws_connect('http://localhost:9999/socket') as ws:
            async for msg in ws:
                if n == 0:
                    await ws.send_str("close")
                    break
                if msg.type == aiohttp.WSMsgType.TEXT:
                    if msg.data == "close":
                        await ws.close()
                        break
                    else:
                        print(msg.data)
                        n -= 1
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break
    print("Client stopped")

if __name__ == '__main__':
    asyncio.run(client())

不是死机,只是你的取消和记录有点不正确,你应该等待取消的任务

async def listen_on_socket(ws: web.WebSocketResponse, send_task: asyncio.Task):
    """helper func which Listen messages to socket"""
    async for msg in ws:
        if msg.type == WSMsgType.TEXT:
            if msg.data == "close":
                await ws.close()
                send_task.cancel()
                try:
                    await send_task
                except asyncio.CancelledError:
                    print("send task cancelled")
                print(send_task.cancelled(), send_task.done(), send_task)
                break
        elif msg.type == WSMsgType.ERROR:
            print(f'ws connection closed with exception {ws.exception()}')
    print("* listen_on_socket * Finished listening")

还应该在 websocket_handler 内的 gather 调用中设置 return_exceptions=True 以防止异常传播。

您可以用 try-finally 块包装所有函数体并确保它正常完成(确定只是为了调试,而不是在最终实现中)。

来自 aiohttp documentation从 WebSocket 读取(等待 ws.receive())只能在请求处理程序任务中完成 ;但是,将 (ws.send_str(...)) 写入 WebSocket、关闭 (await ws.close()) 和取消处理程序任务可能会委托给其他任务。

这里的错误是我在 listen_on_socket 中创建了 reading from ws 任务。

解决。仅在服务器端发生变化,客户端相同:

from aiohttp import web, WSMsgType
import asyncio


async def send_to_socket(ws: web.WebSocketResponse):
    """helper func which send messages to socket"""
    for i in range(4):
        try:
            if ws.closed:
                break
            else:
                await ws.send_str(f"I am super socket server-{i} !!!")
        except Exception as ex:
            print(ex)
            break
        await asyncio.sleep(1.5)
    if not ws.closed:
        await ws.send_str("close")

    print(f"| send_to_socket | Finished sending {id(ws)}")


async def websocket_handler(req:  web.Request) -> web.WebSocketResponse:
    """Socket aiohttp handler"""
    ws = web.WebSocketResponse()
    print(f"Handler | Started websocket: {id(ws)}")
    await ws.prepare(req)
    # create another task for writing
    asyncio.create_task(send_to_socket(ws))
    async for msg in ws:
        if msg.type == WSMsgType.TEXT:
            if msg.data == "close":
                await ws.close()
                break
        elif msg.type == WSMsgType.ERROR:
            print(f'ws connection closed with exception {ws.exception()}')
    print(f"Connection {id(ws)} is finished")
    return ws

if __name__ == '__main__':
    app = web.Application()
    app.router.add_get("/socket", websocket_handler)
    web.run_app(app, host="0.0.0.0", port=9999)