Python - WebSocket 的异步 callback/receiver

Python - Async callback/receiver for WebSocket

我正在尝试实现与服务器的 WebSocket 连接(Python 应用 <=> Django 应用)

整个系统运行在大 Asyncio 循环中,有很多任务。代码片段只是非常小的测试部分。

我可以随时将任何数据发送到服务器,其中许多将输入请求内容并等待响应。但我想为所有传入消息设置一些“始终 运行” 处理程序。 (当 Django 数据库中的某些内容发生变化时,我想将更改发送到 python 应用程序)。

如何始终包含 运行 receiver/ 或向 websocket 添加回调?我找不到任何解决方案。

我的代码片段:

import asyncio, json, websockets, logging

class UpdateConnection:

    async def connect(self,botName):
        self.sock = await websockets.connect('ws://localhost:8000/updates/bot/'+botName)
        
    async def send(self,data):
        try:
            await self.sock.send(json.dumps(data))
        except:
            logging.info("Websocket connection lost!")
            # Find a way how to reconenct... or make socket reconnect automatically

            
if __name__ == '__main__':
    async def DebugLoop(socketCon):
        await socketCon.connect("dev")
        print("Running..")
        while True:
            data = {"type": "debug"}
            await socketCon.send(data)
            await asyncio.sleep(1)

    uSocket = UpdateConnection()
    loop = asyncio.get_event_loop()
    loop.create_task(DebugLoop(uSocket))
    loop.run_forever()

连接后我的调试服务器将开始以随机间隔向客户端发送随机消息,我想以某种方式以异步方式处理它们。

感谢您的帮助:)

你不必做的那么复杂。首先,我建议您使用 websockets 模块提供的上下文模式。

来自文档:

connect() can be used as an infinite asynchronous iterator to reconnect automatically on errors:

async for websocket in websockets.connect(...):
    try:
        ...
    except websockets.ConnectionClosed:
        continue

此外,您只需通过等待传入消息来保持连接有效:

my_websocket = None

async for websocket in websockets.connect('ws://localhost:8000/updates/bot/' + botName):
    try:
        my_websocket = websocket
        async for message in websocket:
            pass # here you could also process incoming messages
    except websockets.ConnectionClosed:
        my_websocket = None
        continue

如您所见,我们这里有一个嵌套循环:

  1. 外循环不断重连服务器
  2. 内循环一次处理一条传入消息

如果您已连接,并且没有来自服务器的消息,这将只是休眠。

这里发生的另一件事是 my_websocket 设置为活动连接,并在连接丢失时再次取消设置。 在脚本的其他部分,您可以使用 my_websocket 发送数据。请注意,无论您在哪里使用它,您都需要检查它当前是否已设置:

async def send(data):
    if my_websocket:
        await my_websocket.send(json.dumps(data))

这只是一个例子,你也可以将websocket对象作为一个对象成员,或者通过setter函数传递给另一个组件等