具有多个服务器的 Asyncio 循环

Asyncio loop with several servers

我有以下代码片段,应该 运行 一个 websockets 服务器 + 一个 NATS 服务器。

每次新消息到达 NATS 时,websocket 服务器应该将消息发送到所有连接的 websockets。我看到一个奇怪的行为,因为 websocket 连接没有保持活动状态。

我想我在 asyncio 循环中搞混了。知道我在这里遗漏了什么吗?

import asyncio
import os
import logging
import json
import websockets
from nats.aio.client import Client as NATS

async def main():
    #
    # websocket related stuff
    #
    clients = set()

    # Register each new client
    async def register(websocket, path):
        clients.add(websocket)
    await websockets.serve(register, "localhost", 8765)

    # NATS related stuff
    # Send message to all connected websockets
    async def message_handler(msg):
        data = msg.data.decode()
        logging.debug('handling new message: {}'.format(data))
        for ws in clients:
            await ws.send("test")

    # Connection to NATS message queue
    nats_url = os.getenv('NATS_URL', 'nats://nats:4222')
    nc = NATS()
    await nc.connect([nats_url])

    # Subscription to all data related messages
    await nc.subscribe("data.*", cb=message_handler)

    await asyncio.Event().wait()

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)

    asyncio.run(main())

每次有新的websocket客户端连接,注册后连接关闭:

DEBUG:websockets.protocol:server - state = CONNECTING
DEBUG:websockets.protocol:server - event = connection_made(<_SelectorSocketTransport fd=6 read=idle write=<idle, bufsize=0>>)
DEBUG:websockets.protocol:server - event = data_received(<535 bytes>)
DEBUG:websockets.server:server < GET /?token=234RZRZER HTTP/1.1
DEBUG:websockets.server:server < Headers([('Host', '127.0.0.1:8765'), ('Connection', 'Upgrade'), ('Pragma', 'no-cache'), ('Cache-Control', 'no-cache'), ('Upgrade', 'websocket'), ('Origin', 'file://'), ('Sec-WebSocket-Version', '13'), ('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_4_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36'), ('Accept-Encoding', 'gzip, deflate, br'), ('Accept-Language', 'fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7'), ('Sec-GPC', '1'), ('Sec-WebSocket-Key', 'KL0hFYOjOc25n/XwcON/9A=='), ('Sec-WebSocket-Extensions', 'permessage-deflate; client_max_window_bits')])
DEBUG:websockets.server:server > HTTP/1.1 101 Switching Protocols
DEBUG:websockets.server:server > Headers([('Upgrade', 'websocket'), ('Connection', 'Upgrade'), ('Sec-WebSocket-Accept', 'JpQAUwE+IzhPyNdqQuLeyROFhGo='), ('Sec-WebSocket-Extensions', 'permessage-deflate'), ('Date', 'Wed, 04 Aug 2021 13:21:40 GMT'), ('Server', 'Python/3.8 websockets/9.1')])
DEBUG:websockets.protocol:server - state = OPEN
DEBUG:websockets.protocol:server - state = CLOSING
DEBUG:websockets.protocol:server > Frame(fin=True, opcode=<Opcode.CLOSE: 8>, data=b'\x03\xe8', rsv1=False, rsv2=False, rsv3=False)
DEBUG:websockets.protocol:server - event = data_received(<8 bytes>)
DEBUG:websockets.protocol:server < Frame(fin=True, opcode=<Opcode.CLOSE: 8>, data=b'\x03\xe8', rsv1=False, rsv2=False, rsv3=False)
DEBUG:websockets.protocol:server x half-closing TCP connection
DEBUG:websockets.protocol:server - event = eof_received()
DEBUG:websockets.protocol:server - event = connection_lost(None)
DEBUG:websockets.protocol:server - state = CLOSED
DEBUG:websockets.protocol:server x code = 1000, reason = [no reason]

您可能想要这样的东西 – 两个任务,一个为 websockets 服务,另一个用于监听消息,以及一个连接两者的队列。

这里可能有一些愚蠢的错误,因为我没有 NATS/websocket 可以用来测试的设置。

import asyncio
import os
import logging
import websockets
from nats.aio.client import Client as NATS


async def boot_server(
    stop_signal: asyncio.Event, message_queue: asyncio.Queue
):
    clients = set()

    async def register(websocket, path):
        # Register client
        clients.add(websocket)
        try:
            # Wait forever for messages
            async for message in websocket:
                print(websocket, message)
        finally:
            try:
                clients.remove(websocket)
            except Exception:
                pass

    async with websockets.serve(register, "localhost", 8765):
        while not stop_signal.is_set():
            # TODO: there's a small bug here in that the stop signal is only checked
            #       after a message has been processed
            msg = await message_queue.get()
            for client in clients:
                # TODO: should probably add error tolerance in this loop
                #       (i.e. if one send fails, others are still sent)
                await client.send(msg)


async def listen_nats(message_queue: asyncio.Queue):
    # Connection to NATS message queue
    nats_url = os.getenv("NATS_URL", "nats://nats:4222")
    nc = NATS()
    await nc.connect([nats_url])

    async def message_handler(msg):
        data = msg.data.decode()
        logging.debug("handling new message: {}".format(data))
        await message_queue.put(data)

    # Subscription to all data related messages
    await nc.subscribe("data.*", cb=message_handler)

    # TODO: figure out how to close the NATS connection?


async def main():
    stop_signal = asyncio.Event()
    message_queue = asyncio.Queue()
    ws_server_task = asyncio.create_task(
        boot_server(stop_signal, message_queue)
    )
    nats_task = asyncio.create_task(listen_nats(message_queue))

    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        stop_signal.set()
    await ws_server_task


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)

    asyncio.run(main())