Python Websockets 使用队列将数据从客户端处理程序传递到协程进行处理

Python Websockets Use Queue to pass data from client handler to a coroutine for processing

我有以下生产者-消费者架构:

问题是,由于找不到更好的词,我“被困在客户端处理程序中”。 我找不到将参数传递给客户端处理程序的方法,因此无法访问队列以将数据转发到 client_handler

这里是到目前为止到达的代码

import asyncio
import websockets

# Websockets client Handler accepts data and puts it into queue
async def client_handler(websocket, path):
    print(f"Connected with path '{path}'")

    async for msg_rx in websocket:
        if not msg_rx:
            break

        print(f"RX: {msg_rx }")

        # TODO Add to Queue
        # HOW DO I ACCESS THE QUEUE?

    print(f"Disconnected from Path '{path}'")


async def task_ws_server(q):
    
    # TODO how do I pass q to the client handler???
    async with websockets.serve(client_handler, '127.0.0.1', 5001):
        await asyncio.Future()  # run forever

async def task_consumer(q):
    # get elements from Queue
    while True:
        data = await q.get()

        # Process them like storing to file or forward to other code
        print(data) # print as stand-in for more complex code

        q.task_done()


async def main():
    
    # Queue to allow moving data from client_handler to Task_consumer
    q = asyncio.Queue()

    # Start consumer task
    consumer = asyncio.create_task(task_consumer(q))

    # Start and run WS Server to handle incoming connections
    await asyncio.gather(*[
        asyncio.create_task(task_ws_server(q)),
    ])

    await q.join()
    consumer.cancel()


if __name__ == '__main__':

    asyncio.run(main())

我找到了一个解决方案:将队列声明移到顶部,这意味着可以在异步函数内部访问队列。我不喜欢这个解决方案,因为这意味着我必须在本地声明 client_handler 或在全局范围内公开队列

您可以让您的 client_handler 将队列作为参数,并使用 functools.partial to create a function that you can pass to websockets.serve

import functools

async def client_handler(websocket, path, queue):
    # Do something with queue
    pass

async def task_ws_server(q):
    queued_client_handler = functools.partial(client_handler, queue=q)

    async with websockets.serve(queued_client_handler, '127.0.0.1', 5001):
        await asyncio.Future()  # run forever

我找到了面向对象的解决方案。通过 self 您可以访问 client_handler

之外的变量
import asyncio
import websockets

class CustomWebSocketServer():

    def __init__(self, host='127.0.0.1', port=5001):

        self.host = host
        self.port = port    

        self.queue_rx = None

        self.ws_clients = set()

    async def run(self):

        # Queue to allow moving data from client_handler to Task_consumer
        self.queue_rx = asyncio.Queue()

        # Start consumer task
        consumer = asyncio.create_task(self.task_consumer())

        # Start and run WS Server to handle incoming connections
        await websockets.serve(self.client_handler, '127.0.0.1', 5001)
        
        await asyncio.Future()  # run forever

        await self.queue_rx.join()
        consumer.cancel()

    # Websockets client Handler accepts data and puts it into queue
    async def client_handler(self, ws, path):
        print(f"Connected with path '{path}'")
        try:
            # Register ws client
            self.ws_clients.add(ws)

            async for msg_rx in ws:

                if not msg_rx:
                    break

                printf"RX: {msg_rx}")

                # Add to Queue
                await self.queue_rx.put(msg_rx)

        finally:

            self.ws_clients.remove(ws)

            print(f"Disconnected from Path '{path}'")


    async def task_consumer(self):
        print"task consumer start")
        # get elements from Queue
        while True:
            data = await self.queue_rx.get()

            # Process them like storing to file or forward to other code
            print(data) # print as stand-in for more complex code

            self.queue_rx.task_done()


if __name__ == '__main__':
    Server = CustomWebSocketServer()
    asyncio.run(Server.run())