多客户端流式 Websocket 端点(Python)

Multiclient Streaming Websocket endpoint (Python)

最近我进入了 "crypto mania" 并开始在一些交易所围绕 API 编写自己的包装器。

Binance 特别有一个流式 websocket 端点。

您可以在其中通过 websocket 端点流式传输数据。 我想我会使用 sanic 自己尝试一下。

这是我的 websocket 路由

@ws_routes.websocket("/hello")
async def hello(request, ws):
    while True:
        await ws.send("hello")

现在我在 2 台不同的机器上有 2 个客户端连接到它

async def main():
    async with aiohttp.ClientSession() as session:

        ws  = await session.ws_connect("ws://192.168.86.31:8000/hello")
        while True:
            data = await ws.receive()
            print(data)

然而,只有一个客户端能够连接并接收来自服务器的发送数据。我假设由于 while 循环它阻塞并阻止其他连接连接,因为它没有 yield?

我们如何在不阻塞其他连接的情况下将其流式传输到多个客户端?

我考虑过添加更多工作人员,这似乎可以解决问题,但我不明白的是,这不是一个可扩展性很强的解决方案。因为每个客户都是自己的工人,如果您有数千个甚至只有 10 个客户,那么每个客户 10 个工人 1。

那么 Binance 如何进行他们的 websocket 流式传输?或者推特流端点如何工作?

它如何能够为多个并发客户端提供无限流? 因为最终这就是我想要做的

也许是这样的?

import aiohttp
import asyncio
loop = asyncio.get_event_loop()
async def main():
    async with aiohttp.ClientSession() as session:
        ws  = await session.ws_connect("ws://192.168.86.31:8000/hello")
        while True:
            data = await ws.receive()
            print(data)

multiple_coroutines = [main() for _ in range(10)]
loop.run_until_complete(asyncio.gather(*multiple_coroutines))

解决这个问题的方法是这样的。

我正在使用 sanic 框架

class Stream:
    def __init__(self):
        self._connected_clients = set()

    async def __call__(self, *args, **kwargs):
        await self.stream(*args, **kwargs)

    async def stream(self, request, ws):
        self._connected_clients.add(ws)

        while True:
            disconnected_clients = []
            for client in self._connected_clients:  # check for disconnected clients
                if client.state == 3:  # append to a list because error will be raised if removed from set while iterating over it 
                    disconnected_clients.append(client)
            for client in disconnected_clients:  # remove disconnected clients
                self._connected_clients.remove(client)

            await asyncio.wait([client.send("Hello") for client in self._connected_clients]))


ws_routes.add_websocket_route(Stream(), "/stream")
  1. 跟踪每个 websocket 会话
  2. 附加到 listset
  3. 检查无效的 websocket 会话并从您的 websocket 会话容器中删除
  4. 做一个 await asyncio.wait([ws_session.send() for ws_session [list of valid sessions]]) 基本上是广播。

5.profit!

这基本上就是 pubsub 设计模式