将带有 websockets 的 Quart 项目从 asyncio 迁移到 trio
Migrating a Quart project with websockets from asyncio to trio
我正在尝试将我的 asyncio 项目转换为 trio。
我知道我必须使用内存通道而不是队列,但出于某种原因我没有得到我期望的结果。
我的主要问题是,当我 运行 两个客户端时,如果第二个客户端离开,第一个客户端不会收到通知(从服务器广播 'part' 消息会引发错误)。
另一个问题是有时客户端在打开 websocket 时立即退出。
当我使用 asyncio 时,一切正常。
这是我在第二个客户端断开连接时得到的堆栈跟踪:
[2021-07-30 18:39:51,899] ERROR in app: Exception on websocket /ws
Traceback (most recent call last):
File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 175, in handle_websocket
return await self.full_dispatch_websocket(websocket_context)
File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 197, in full_dispatch_websocket
result = await self.handle_user_exception(error)
File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 166, in handle_user_exception
raise error
File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 195, in full_dispatch_websocket
result = await self.dispatch_websocket(websocket_context)
File "/tmp/debug/venv/lib/python3.9/site-packages/quart/app.py", line 1651, in dispatch_websocket
return await self.ensure_async(handler)(**websocket_.view_args)
File "/tmp/debug/server.py", line 103, in wsocket
nursery.start_soon(receiving, u)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 815, in __aexit__
raise combined_error_from_nursery
trio.MultiError: Cancelled(), Cancelled(), Cancelled()
Details of embedded exception 1:
Traceback (most recent call last):
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
Details of embedded exception 2:
Traceback (most recent call last):
File "/tmp/debug/server.py", line 68, in receiving
data = await websocket.receive_json()
File "/tmp/debug/venv/lib/python3.9/site-packages/quart/wrappers/websocket.py", line 68, in receive_json
data = await self.receive()
File "/tmp/debug/venv/lib/python3.9/site-packages/quart/wrappers/websocket.py", line 57, in receive
return await self._receive()
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_channel.py", line 314, in receive
return await trio.lowlevel.wait_task_rescheduled(abort_fn)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/tmp/debug/venv/lib/python3.9/site-packages/outcome/_impl.py", line 138, in unwrap
raise captured_error
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
Details of embedded exception 3:
Traceback (most recent call last):
File "/tmp/debug/server.py", line 54, in sending
data = await u.queue_recv.receive()
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_channel.py", line 314, in receive
return await trio.lowlevel.wait_task_rescheduled(abort_fn)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/tmp/debug/venv/lib/python3.9/site-packages/outcome/_impl.py", line 138, in unwrap
raise captured_error
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/debug/server.py", line 63, in sending
await broadcast({'type': 'part', 'data': u.name})
File "/tmp/debug/server.py", line 75, in broadcast
await user.queue_send.send(message)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_channel.py", line 159, in send
await trio.lowlevel.checkpoint_if_cancelled()
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 2361, in checkpoint_if_cancelled
await _core.checkpoint()
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 2339, in checkpoint
await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/tmp/debug/venv/lib/python3.9/site-packages/outcome/_impl.py", line 138, in unwrap
raise captured_error
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
这是代码(将 TRIO
设置为 False
以使用 asyncio):
server.py
#!/usr/bin/env python
from quart import Quart, websocket, request, jsonify, json
from quart_trio import QuartTrio
from functools import wraps
import uuid
import trio
import asyncio
from quart_auth import AuthUser, AuthManager, login_user, _AuthSerializer
TRIO = True
if TRIO:
app = QuartTrio(__name__)
else:
app = Quart(__name__)
app.secret_key = '**changeme**'
authorized_users = set()
class User(AuthUser):
@staticmethod
def current():
token = websocket.cookies['QUART_AUTH']
serializer = _AuthSerializer('**changeme**', 'quart auth salt')
user_id = serializer.loads(token)
for u in authorized_users:
if u.auth_id == user_id:
return u
return None
def __init__(self, auth_id):
super().__init__(auth_id)
self.name = None
self.queue = None # asyncio
self.queue_send = None #trio
self.queue_recv = None #trio
self.connected = False
self.websockets = set()
def to_dict(self):
return {
'id': self.auth_id,
'name': self.name
}
auth_manager = AuthManager()
auth_manager.user_class = User
async def sending(u: User):
await broadcast({'type': 'join', 'data': u.name})
try:
while True:
if TRIO:
data = await u.queue_recv.receive()
else:
data = await u.queue.get()
for s in u.websockets:
await s.send_json(data)
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
await broadcast({'type': 'part', 'data': u.name})
async def receiving(u: User):
while True:
data = await websocket.receive_json()
if data['type'] == 'msg':
await broadcast({'type': 'msg', 'user': u.name, 'data': data['data']})
async def broadcast(message):
for user in [u for u in authorized_users if u.connected]:
if TRIO:
await user.queue_send.send(message)
else:
await user.queue.put(message)
@app.route('/api/v1/auth', methods=['POST'])
async def auth_login():
data = await request.json
user_id = str(uuid.uuid4())[:8]
u = User(user_id)
u.name = data['login'] or 'Anonymous'+user_id
if TRIO:
u.queue_send, u.queue_recv = trio.open_memory_channel(float('inf'))
else:
u.queue = asyncio.Queue()
login_user(u, True)
authorized_users.add(u)
return jsonify({'id': user_id, 'name': u.name}), 200
@app.websocket('/ws')
async def wsocket():
u = User.current()
if u is None:
return
u.websockets.add(websocket._get_current_object())
u.connected = True
if TRIO:
async with trio.open_nursery() as nursery:
nursery.start_soon(sending, u)
nursery.start_soon(receiving, u)
else:
producer = asyncio.create_task(sending(u))
consumer = asyncio.create_task(receiving(u))
await asyncio.gather(producer, consumer)
auth_manager.init_app(app)
if __name__ == "__main__":
app.run(host='localhost', port=8080)
client.py
#!/usr/bin/env python
import asks
import trio
import trio_websocket
import json
asks.init(trio)
class User:
def __init__(self, name: str="") -> None:
self.name = name
class Client(User):
def __init__(self) -> None:
super(Client, self).__init__()
self.web_url = 'http://localhost:8080/api/v1'
self.ws_url = 'ws://localhost:8080/ws'
self.ws = None
self.nursery = None
self.cookiejar = {}
async def send(self, msg: dict) -> None:
if self.ws is not None:
await self.ws.send_message(json.dumps(msg))
async def reader(self, websocket) -> None:
while True:
try:
message_raw = await websocket.get_message()
msg = json.loads(message_raw)
if msg['type'] == 'msg':
print(f"<{msg['user']}> {msg['data']}")
elif msg['type'] == 'join':
print(f"* {msg['data']} joined")
elif msg['type'] == 'part':
print(f"* {msg['data']} left")
except trio_websocket.ConnectionClosed:
break
async def login(self) -> None:
rlogin = await asks.post(self.web_url + '/auth', json={'login': self.name, 'password': 'password'})
for c in rlogin.cookies:
if c.name == 'QUART_AUTH':
self.cookiejar = {'QUART_AUTH': c.value}
async def connect(self) -> None:
await self.login()
async with trio_websocket.open_websocket_url(self.ws_url, extra_headers=[('Cookie', 'QUART_AUTH'+'='+self.cookiejar['QUART_AUTH'])]) as websocket:
self.ws = websocket
await self.send({'type': 'msg', 'data': 'hello'})
async with trio.open_nursery() as nursery:
self.nursery = nursery
nursery.start_soon(self.reader, websocket)
def run(self) -> None:
trio.run(self.connect)
c = Client()
c.name = 'clientA'
c.run()
编辑:我使用 anyio 进行了测试,虽然 anyio+trio 的行为相同,但 anyio+asyncio 重现了该问题(无一例外)。所以我猜它来自队列替换。
好的,@tibs,我想我已经找到问题所在了。问题在于 Trio 处理取消的方式。如需完整文档,请阅读此文档:
https://trio.readthedocs.io/en/stable/reference-core.html#cancellation-and-timeouts
但是,为了解释这里发生的事情,当用户断开连接时,Quart-Trio 所做的是在该 websocket 下 running/waiting 的每个协程中引发 Cancelled
异常。对于 websocket 用户,目前有两个位置正在等待:
在async def sending(u: User):
async def sending(u: User):
await broadcast({'type': 'join', 'data': u.name})
try:
while True:
if TRIO:
data = await u.queue_recv.receive() <--- Code is waiting here, Cancelled is raised here
else:
data = await u.queue.get()
for s in u.websockets:
await s.send_json(data)
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
await broadcast({'type': 'part', 'data': u.name})
在async def receiving(u: User):
async def receiving(u: User):
while True:
data = await websocket.receive_json() <--- Code is waiting here, Cancelled is raised here
if data['type'] == 'msg':
await broadcast({'type': 'msg', 'user': u.name, 'data': data['data']})
好的,接下来会发生什么?那么,在 sending()
函数中,我们向下移动到 finally
块,该块开始执行,但随后我们调用另一个等待函数:
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
await broadcast({'type': 'part', 'data': u.name}) <--- we call an awaitable here
来自 Trio 文档:
Cancellations in Trio are “level triggered”, meaning that once a block has been cancelled, all cancellable operations in that block will keep raising Cancelled.
所以当 await broadcast(...)
被调用时,它立即是 Cancelled
,不像 asyncio
的行为不同。这解释了为什么永远不会发送您的“部分”消息。所以在三重奏的时候,如果你想在被取消的时候做一些清理工作,你应该打开一个新的取消范围,并屏蔽它不被取消,像这样:
async def sending(u: User):
await broadcast({'type': 'join', 'data': u.name})
try:
while True:
if TRIO:
data = await u.queue_recv.receive() <--- Code is waiting here, Cancelled is raised here
else:
data = await u.queue.get()
for s in u.websockets:
await s.send_json(data)
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
with trio.move_on_after(5) as leaving_cancel_scope:
# Shield from the cancellation for 5s to run the broadcast of leaving
leaving_cancel_scope.shield = True
await broadcast({'type': 'part', 'data': u.name})
或者您可以在应用程序 nursery 上启动广播协程。请注意,如果 broadcast(...)
崩溃,您将导致整个 运行 应用程序崩溃,除非您在 broadcast(...)
函数中放置 try/except:
async def sending(u: User):
await broadcast({'type': 'join', 'data': u.name})
try:
while True:
if TRIO:
data = await u.queue_recv.receive()
else:
data = await u.queue.get()
for s in u.websockets:
await s.send_json(data)
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
app.nursery.start_soon(broadcast, {'type': 'part', 'data': u.name})
在此之后你仍然会得到 Cancelled
异常流到你的 websocket 函数,所以你可能想在那里捕获它们。请注意,您需要捕获 BaseException
才能捕获错误,例如:
@app.websocket('/ws')
async def wsocket():
u = User.current()
if u is None:
return
u.websockets.add(websocket._get_current_object())
u.connected = True
if TRIO:
try:
async with trio.open_nursery() as nursery:
nursery.start_soon(sending, u)
nursery.start_soon(receiving, u)
except BaseException as e:
print(f'websocket funcs crashed with exception: {e}')
特别是因为 trio 不允许您静默删除异常,您需要捕获它们或崩溃。我希望这足以让您开始解决您遇到的问题。
我正在尝试将我的 asyncio 项目转换为 trio。 我知道我必须使用内存通道而不是队列,但出于某种原因我没有得到我期望的结果。
我的主要问题是,当我 运行 两个客户端时,如果第二个客户端离开,第一个客户端不会收到通知(从服务器广播 'part' 消息会引发错误)。 另一个问题是有时客户端在打开 websocket 时立即退出。 当我使用 asyncio 时,一切正常。
这是我在第二个客户端断开连接时得到的堆栈跟踪:
[2021-07-30 18:39:51,899] ERROR in app: Exception on websocket /ws
Traceback (most recent call last):
File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 175, in handle_websocket
return await self.full_dispatch_websocket(websocket_context)
File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 197, in full_dispatch_websocket
result = await self.handle_user_exception(error)
File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 166, in handle_user_exception
raise error
File "/tmp/debug/venv/lib/python3.9/site-packages/quart_trio/app.py", line 195, in full_dispatch_websocket
result = await self.dispatch_websocket(websocket_context)
File "/tmp/debug/venv/lib/python3.9/site-packages/quart/app.py", line 1651, in dispatch_websocket
return await self.ensure_async(handler)(**websocket_.view_args)
File "/tmp/debug/server.py", line 103, in wsocket
nursery.start_soon(receiving, u)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 815, in __aexit__
raise combined_error_from_nursery
trio.MultiError: Cancelled(), Cancelled(), Cancelled()
Details of embedded exception 1:
Traceback (most recent call last):
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
Details of embedded exception 2:
Traceback (most recent call last):
File "/tmp/debug/server.py", line 68, in receiving
data = await websocket.receive_json()
File "/tmp/debug/venv/lib/python3.9/site-packages/quart/wrappers/websocket.py", line 68, in receive_json
data = await self.receive()
File "/tmp/debug/venv/lib/python3.9/site-packages/quart/wrappers/websocket.py", line 57, in receive
return await self._receive()
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_channel.py", line 314, in receive
return await trio.lowlevel.wait_task_rescheduled(abort_fn)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/tmp/debug/venv/lib/python3.9/site-packages/outcome/_impl.py", line 138, in unwrap
raise captured_error
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
Details of embedded exception 3:
Traceback (most recent call last):
File "/tmp/debug/server.py", line 54, in sending
data = await u.queue_recv.receive()
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_channel.py", line 314, in receive
return await trio.lowlevel.wait_task_rescheduled(abort_fn)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/tmp/debug/venv/lib/python3.9/site-packages/outcome/_impl.py", line 138, in unwrap
raise captured_error
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/debug/server.py", line 63, in sending
await broadcast({'type': 'part', 'data': u.name})
File "/tmp/debug/server.py", line 75, in broadcast
await user.queue_send.send(message)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_channel.py", line 159, in send
await trio.lowlevel.checkpoint_if_cancelled()
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 2361, in checkpoint_if_cancelled
await _core.checkpoint()
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 2339, in checkpoint
await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED)
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/tmp/debug/venv/lib/python3.9/site-packages/outcome/_impl.py", line 138, in unwrap
raise captured_error
File "/tmp/debug/venv/lib/python3.9/site-packages/trio/_core/_run.py", line 1172, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
这是代码(将 TRIO
设置为 False
以使用 asyncio):
server.py
#!/usr/bin/env python
from quart import Quart, websocket, request, jsonify, json
from quart_trio import QuartTrio
from functools import wraps
import uuid
import trio
import asyncio
from quart_auth import AuthUser, AuthManager, login_user, _AuthSerializer
TRIO = True
if TRIO:
app = QuartTrio(__name__)
else:
app = Quart(__name__)
app.secret_key = '**changeme**'
authorized_users = set()
class User(AuthUser):
@staticmethod
def current():
token = websocket.cookies['QUART_AUTH']
serializer = _AuthSerializer('**changeme**', 'quart auth salt')
user_id = serializer.loads(token)
for u in authorized_users:
if u.auth_id == user_id:
return u
return None
def __init__(self, auth_id):
super().__init__(auth_id)
self.name = None
self.queue = None # asyncio
self.queue_send = None #trio
self.queue_recv = None #trio
self.connected = False
self.websockets = set()
def to_dict(self):
return {
'id': self.auth_id,
'name': self.name
}
auth_manager = AuthManager()
auth_manager.user_class = User
async def sending(u: User):
await broadcast({'type': 'join', 'data': u.name})
try:
while True:
if TRIO:
data = await u.queue_recv.receive()
else:
data = await u.queue.get()
for s in u.websockets:
await s.send_json(data)
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
await broadcast({'type': 'part', 'data': u.name})
async def receiving(u: User):
while True:
data = await websocket.receive_json()
if data['type'] == 'msg':
await broadcast({'type': 'msg', 'user': u.name, 'data': data['data']})
async def broadcast(message):
for user in [u for u in authorized_users if u.connected]:
if TRIO:
await user.queue_send.send(message)
else:
await user.queue.put(message)
@app.route('/api/v1/auth', methods=['POST'])
async def auth_login():
data = await request.json
user_id = str(uuid.uuid4())[:8]
u = User(user_id)
u.name = data['login'] or 'Anonymous'+user_id
if TRIO:
u.queue_send, u.queue_recv = trio.open_memory_channel(float('inf'))
else:
u.queue = asyncio.Queue()
login_user(u, True)
authorized_users.add(u)
return jsonify({'id': user_id, 'name': u.name}), 200
@app.websocket('/ws')
async def wsocket():
u = User.current()
if u is None:
return
u.websockets.add(websocket._get_current_object())
u.connected = True
if TRIO:
async with trio.open_nursery() as nursery:
nursery.start_soon(sending, u)
nursery.start_soon(receiving, u)
else:
producer = asyncio.create_task(sending(u))
consumer = asyncio.create_task(receiving(u))
await asyncio.gather(producer, consumer)
auth_manager.init_app(app)
if __name__ == "__main__":
app.run(host='localhost', port=8080)
client.py
#!/usr/bin/env python
import asks
import trio
import trio_websocket
import json
asks.init(trio)
class User:
def __init__(self, name: str="") -> None:
self.name = name
class Client(User):
def __init__(self) -> None:
super(Client, self).__init__()
self.web_url = 'http://localhost:8080/api/v1'
self.ws_url = 'ws://localhost:8080/ws'
self.ws = None
self.nursery = None
self.cookiejar = {}
async def send(self, msg: dict) -> None:
if self.ws is not None:
await self.ws.send_message(json.dumps(msg))
async def reader(self, websocket) -> None:
while True:
try:
message_raw = await websocket.get_message()
msg = json.loads(message_raw)
if msg['type'] == 'msg':
print(f"<{msg['user']}> {msg['data']}")
elif msg['type'] == 'join':
print(f"* {msg['data']} joined")
elif msg['type'] == 'part':
print(f"* {msg['data']} left")
except trio_websocket.ConnectionClosed:
break
async def login(self) -> None:
rlogin = await asks.post(self.web_url + '/auth', json={'login': self.name, 'password': 'password'})
for c in rlogin.cookies:
if c.name == 'QUART_AUTH':
self.cookiejar = {'QUART_AUTH': c.value}
async def connect(self) -> None:
await self.login()
async with trio_websocket.open_websocket_url(self.ws_url, extra_headers=[('Cookie', 'QUART_AUTH'+'='+self.cookiejar['QUART_AUTH'])]) as websocket:
self.ws = websocket
await self.send({'type': 'msg', 'data': 'hello'})
async with trio.open_nursery() as nursery:
self.nursery = nursery
nursery.start_soon(self.reader, websocket)
def run(self) -> None:
trio.run(self.connect)
c = Client()
c.name = 'clientA'
c.run()
编辑:我使用 anyio 进行了测试,虽然 anyio+trio 的行为相同,但 anyio+asyncio 重现了该问题(无一例外)。所以我猜它来自队列替换。
好的,@tibs,我想我已经找到问题所在了。问题在于 Trio 处理取消的方式。如需完整文档,请阅读此文档:
https://trio.readthedocs.io/en/stable/reference-core.html#cancellation-and-timeouts
但是,为了解释这里发生的事情,当用户断开连接时,Quart-Trio 所做的是在该 websocket 下 running/waiting 的每个协程中引发 Cancelled
异常。对于 websocket 用户,目前有两个位置正在等待:
在async def sending(u: User):
async def sending(u: User):
await broadcast({'type': 'join', 'data': u.name})
try:
while True:
if TRIO:
data = await u.queue_recv.receive() <--- Code is waiting here, Cancelled is raised here
else:
data = await u.queue.get()
for s in u.websockets:
await s.send_json(data)
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
await broadcast({'type': 'part', 'data': u.name})
在async def receiving(u: User):
async def receiving(u: User):
while True:
data = await websocket.receive_json() <--- Code is waiting here, Cancelled is raised here
if data['type'] == 'msg':
await broadcast({'type': 'msg', 'user': u.name, 'data': data['data']})
好的,接下来会发生什么?那么,在 sending()
函数中,我们向下移动到 finally
块,该块开始执行,但随后我们调用另一个等待函数:
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
await broadcast({'type': 'part', 'data': u.name}) <--- we call an awaitable here
来自 Trio 文档:
Cancellations in Trio are “level triggered”, meaning that once a block has been cancelled, all cancellable operations in that block will keep raising Cancelled.
所以当 await broadcast(...)
被调用时,它立即是 Cancelled
,不像 asyncio
的行为不同。这解释了为什么永远不会发送您的“部分”消息。所以在三重奏的时候,如果你想在被取消的时候做一些清理工作,你应该打开一个新的取消范围,并屏蔽它不被取消,像这样:
async def sending(u: User):
await broadcast({'type': 'join', 'data': u.name})
try:
while True:
if TRIO:
data = await u.queue_recv.receive() <--- Code is waiting here, Cancelled is raised here
else:
data = await u.queue.get()
for s in u.websockets:
await s.send_json(data)
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
with trio.move_on_after(5) as leaving_cancel_scope:
# Shield from the cancellation for 5s to run the broadcast of leaving
leaving_cancel_scope.shield = True
await broadcast({'type': 'part', 'data': u.name})
或者您可以在应用程序 nursery 上启动广播协程。请注意,如果 broadcast(...)
崩溃,您将导致整个 运行 应用程序崩溃,除非您在 broadcast(...)
函数中放置 try/except:
async def sending(u: User):
await broadcast({'type': 'join', 'data': u.name})
try:
while True:
if TRIO:
data = await u.queue_recv.receive()
else:
data = await u.queue.get()
for s in u.websockets:
await s.send_json(data)
finally:
u.websockets.remove(websocket._get_current_object())
if len(u.websockets) == 0:
u.connected = False
app.nursery.start_soon(broadcast, {'type': 'part', 'data': u.name})
在此之后你仍然会得到 Cancelled
异常流到你的 websocket 函数,所以你可能想在那里捕获它们。请注意,您需要捕获 BaseException
才能捕获错误,例如:
@app.websocket('/ws')
async def wsocket():
u = User.current()
if u is None:
return
u.websockets.add(websocket._get_current_object())
u.connected = True
if TRIO:
try:
async with trio.open_nursery() as nursery:
nursery.start_soon(sending, u)
nursery.start_soon(receiving, u)
except BaseException as e:
print(f'websocket funcs crashed with exception: {e}')
特别是因为 trio 不允许您静默删除异常,您需要捕获它们或崩溃。我希望这足以让您开始解决您遇到的问题。