使用 python 网络框架 "quart" 的 Websockets?

Websockets with the python web framework "quart"?

我需要有关 python 网络框架工作的帮助,Quart,更具体地说是 websockets。我希望能够在连接时注册客户端(将其添加到 python 列表),并在断开连接时注销它们(将其从 python 列表中删除)。我在网上能找到的最接近的是这段代码:

connected = set()

async def handler(websocket, path):
    global connected
    # Register.
    connected.add(websocket)
    try:
        # Implement logic here.
        await asyncio.wait([ws.send("Hello!") for ws in connected])
        await asyncio.sleep(10)
    finally:
        # Unregister.
        connected.remove(websocket)

source

但这不适用于 quart websockets。

不胜感激。

此装饰器用于包装 websocket 处理程序时,将从 connected 集中添加和删除 websocket。需要 websocket 的 _get_current_object 方法来获取当前 context 中的 websocket,并且需要 try-finally 以确保无论出现任何错误都删除 websocket。请注意 app.websocket 必须包装(在)collect_websocket 用法之前。

from functools import wraps

connected = set()

def collect_websocket(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        global connected
        connected.add(websocket._get_current_object())
        try:
            return await func(*args, **kwargs)
        finally:
            connected.remove(websocket._get_current_object())
    return wrapper                                                                                                                                                                                                            


@app.websocket('/ws')                                                                                                                                                                                       
@collect_websocket
async def ws():
    ...

编辑: 我是Quart的作者。