为什么 asyncio.sleep 在与 aiohttp websockets 一起使用时将整个任务(内部有 websocket)冻结在 运行 状态?
Why asyncio.sleep freezes whole Task (which has websocket inside) in running state when used with aiohttp websockets?
今天我发现 asyncio
或 aiohttp
有一个很奇怪的问题。
我编写了非常简单的使用 Websockets 的服务器和客户端。当服务器从客户端获得连接时,它创建两个任务,一个任务从客户端监听数据,另一个任务向客户端发送数据。
如果客户端决定结束会话,它会向服务器发送关闭消息,listen_on_socket(服务器)任务完成正常,但是 send_to_socket(服务器)任务如果包含 asyncio.sleep
则被冻结在任务里面。我什至无法取消冻结的任务。
问题的原因是什么,我该如何处理?
我有以下 aiohttp 服务器代码作为示例:
from aiohttp import web, WSMsgType
import asyncio
async def send_to_socket(ws: web.WebSocketResponse):
"""helper func which send messages to socket"""
for i in range(10):
try:
if ws.closed:
break
else:
await ws.send_str(f"I am super socket server-{i} !!!")
except Exception as ex:
print(ex)
break
# remove await asyncio.sleep(0.5) and it works !
print("| send_to_socket | St sleeping")
await asyncio.sleep(0.5)
print("| send_to_socket | Stopped sleeping") # you will not get the message
if not ws.closed:
await ws.send_str("close")
print("| send_to_socket | Finished sending")
async def listen_on_socket(ws: web.WebSocketResponse, send_task: asyncio.Task):
"""helper func which Listen messages to socket"""
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
send_task.cancel()
print(send_task.cancelled(), send_task.done(), send_task)
break
elif msg.type == WSMsgType.ERROR:
print(f'ws connection closed with exception {ws.exception()}')
print("* listen_on_socket * Finished listening")
async def websocket_handler(req: web.Request) -> web.WebSocketResponse:
"""Socket aiohttp handler"""
ws = web.WebSocketResponse()
print(f"Handler | Started websocket: {id(ws)}")
await ws.prepare(req)
t = asyncio.create_task(send_to_socket(ws))
await asyncio.gather(listen_on_socket(ws, t), t)
print("Handler | websocket connection closed")
return ws
if __name__ == '__main__':
app = web.Application()
app.router.add_get("/socket", websocket_handler)
web.run_app(app, host="0.0.0.0", port=9999)
我有以下 aiohttp 客户端代码作为示例:
from aiohttp import ClientSession
import aiohttp
import asyncio
async def client():
n = 3
async with ClientSession() as session:
async with session.ws_connect('http://localhost:9999/socket') as ws:
async for msg in ws:
if n == 0:
await ws.send_str("close")
break
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
break
else:
print(msg.data)
n -= 1
elif msg.type == aiohttp.WSMsgType.ERROR:
break
print("Client stopped")
if __name__ == '__main__':
asyncio.run(client())
不是死机,只是你的取消和记录有点不正确,你应该等待取消的任务
async def listen_on_socket(ws: web.WebSocketResponse, send_task: asyncio.Task):
"""helper func which Listen messages to socket"""
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
send_task.cancel()
try:
await send_task
except asyncio.CancelledError:
print("send task cancelled")
print(send_task.cancelled(), send_task.done(), send_task)
break
elif msg.type == WSMsgType.ERROR:
print(f'ws connection closed with exception {ws.exception()}')
print("* listen_on_socket * Finished listening")
还应该在 websocket_handler
内的 gather
调用中设置 return_exceptions=True
以防止异常传播。
您可以用 try-finally 块包装所有函数体并确保它正常完成(确定只是为了调试,而不是在最终实现中)。
来自 aiohttp
documentation:从 WebSocket 读取(等待 ws.receive())只能在请求处理程序任务中完成 ;但是,将 (ws.send_str(...)) 写入 WebSocket、关闭 (await ws.close()) 和取消处理程序任务可能会委托给其他任务。
这里的错误是我在 listen_on_socket 中创建了 reading from ws 任务。
解决。仅在服务器端发生变化,客户端相同:
from aiohttp import web, WSMsgType
import asyncio
async def send_to_socket(ws: web.WebSocketResponse):
"""helper func which send messages to socket"""
for i in range(4):
try:
if ws.closed:
break
else:
await ws.send_str(f"I am super socket server-{i} !!!")
except Exception as ex:
print(ex)
break
await asyncio.sleep(1.5)
if not ws.closed:
await ws.send_str("close")
print(f"| send_to_socket | Finished sending {id(ws)}")
async def websocket_handler(req: web.Request) -> web.WebSocketResponse:
"""Socket aiohttp handler"""
ws = web.WebSocketResponse()
print(f"Handler | Started websocket: {id(ws)}")
await ws.prepare(req)
# create another task for writing
asyncio.create_task(send_to_socket(ws))
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
break
elif msg.type == WSMsgType.ERROR:
print(f'ws connection closed with exception {ws.exception()}')
print(f"Connection {id(ws)} is finished")
return ws
if __name__ == '__main__':
app = web.Application()
app.router.add_get("/socket", websocket_handler)
web.run_app(app, host="0.0.0.0", port=9999)
今天我发现 asyncio
或 aiohttp
有一个很奇怪的问题。
我编写了非常简单的使用 Websockets 的服务器和客户端。当服务器从客户端获得连接时,它创建两个任务,一个任务从客户端监听数据,另一个任务向客户端发送数据。
如果客户端决定结束会话,它会向服务器发送关闭消息,listen_on_socket(服务器)任务完成正常,但是 send_to_socket(服务器)任务如果包含 asyncio.sleep
则被冻结在任务里面。我什至无法取消冻结的任务。
问题的原因是什么,我该如何处理?
我有以下 aiohttp 服务器代码作为示例:
from aiohttp import web, WSMsgType
import asyncio
async def send_to_socket(ws: web.WebSocketResponse):
"""helper func which send messages to socket"""
for i in range(10):
try:
if ws.closed:
break
else:
await ws.send_str(f"I am super socket server-{i} !!!")
except Exception as ex:
print(ex)
break
# remove await asyncio.sleep(0.5) and it works !
print("| send_to_socket | St sleeping")
await asyncio.sleep(0.5)
print("| send_to_socket | Stopped sleeping") # you will not get the message
if not ws.closed:
await ws.send_str("close")
print("| send_to_socket | Finished sending")
async def listen_on_socket(ws: web.WebSocketResponse, send_task: asyncio.Task):
"""helper func which Listen messages to socket"""
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
send_task.cancel()
print(send_task.cancelled(), send_task.done(), send_task)
break
elif msg.type == WSMsgType.ERROR:
print(f'ws connection closed with exception {ws.exception()}')
print("* listen_on_socket * Finished listening")
async def websocket_handler(req: web.Request) -> web.WebSocketResponse:
"""Socket aiohttp handler"""
ws = web.WebSocketResponse()
print(f"Handler | Started websocket: {id(ws)}")
await ws.prepare(req)
t = asyncio.create_task(send_to_socket(ws))
await asyncio.gather(listen_on_socket(ws, t), t)
print("Handler | websocket connection closed")
return ws
if __name__ == '__main__':
app = web.Application()
app.router.add_get("/socket", websocket_handler)
web.run_app(app, host="0.0.0.0", port=9999)
我有以下 aiohttp 客户端代码作为示例:
from aiohttp import ClientSession
import aiohttp
import asyncio
async def client():
n = 3
async with ClientSession() as session:
async with session.ws_connect('http://localhost:9999/socket') as ws:
async for msg in ws:
if n == 0:
await ws.send_str("close")
break
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
break
else:
print(msg.data)
n -= 1
elif msg.type == aiohttp.WSMsgType.ERROR:
break
print("Client stopped")
if __name__ == '__main__':
asyncio.run(client())
不是死机,只是你的取消和记录有点不正确,你应该等待取消的任务
async def listen_on_socket(ws: web.WebSocketResponse, send_task: asyncio.Task):
"""helper func which Listen messages to socket"""
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
send_task.cancel()
try:
await send_task
except asyncio.CancelledError:
print("send task cancelled")
print(send_task.cancelled(), send_task.done(), send_task)
break
elif msg.type == WSMsgType.ERROR:
print(f'ws connection closed with exception {ws.exception()}')
print("* listen_on_socket * Finished listening")
还应该在 websocket_handler
内的 gather
调用中设置 return_exceptions=True
以防止异常传播。
您可以用 try-finally 块包装所有函数体并确保它正常完成(确定只是为了调试,而不是在最终实现中)。
来自 aiohttp
documentation:从 WebSocket 读取(等待 ws.receive())只能在请求处理程序任务中完成 ;但是,将 (ws.send_str(...)) 写入 WebSocket、关闭 (await ws.close()) 和取消处理程序任务可能会委托给其他任务。
这里的错误是我在 listen_on_socket 中创建了 reading from ws 任务。
解决。仅在服务器端发生变化,客户端相同:
from aiohttp import web, WSMsgType
import asyncio
async def send_to_socket(ws: web.WebSocketResponse):
"""helper func which send messages to socket"""
for i in range(4):
try:
if ws.closed:
break
else:
await ws.send_str(f"I am super socket server-{i} !!!")
except Exception as ex:
print(ex)
break
await asyncio.sleep(1.5)
if not ws.closed:
await ws.send_str("close")
print(f"| send_to_socket | Finished sending {id(ws)}")
async def websocket_handler(req: web.Request) -> web.WebSocketResponse:
"""Socket aiohttp handler"""
ws = web.WebSocketResponse()
print(f"Handler | Started websocket: {id(ws)}")
await ws.prepare(req)
# create another task for writing
asyncio.create_task(send_to_socket(ws))
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
break
elif msg.type == WSMsgType.ERROR:
print(f'ws connection closed with exception {ws.exception()}')
print(f"Connection {id(ws)} is finished")
return ws
if __name__ == '__main__':
app = web.Application()
app.router.add_get("/socket", websocket_handler)
web.run_app(app, host="0.0.0.0", port=9999)