多客户端流式 Websocket 端点(Python)
Multiclient Streaming Websocket endpoint (Python)
最近我进入了 "crypto mania" 并开始在一些交易所围绕 API 编写自己的包装器。
Binance 特别有一个流式 websocket 端点。
您可以在其中通过 websocket 端点流式传输数据。
我想我会使用 sanic 自己尝试一下。
这是我的 websocket 路由
@ws_routes.websocket("/hello")
async def hello(request, ws):
while True:
await ws.send("hello")
现在我在 2 台不同的机器上有 2 个客户端连接到它
async def main():
async with aiohttp.ClientSession() as session:
ws = await session.ws_connect("ws://192.168.86.31:8000/hello")
while True:
data = await ws.receive()
print(data)
然而,只有一个客户端能够连接并接收来自服务器的发送数据。我假设由于 while
循环它阻塞并阻止其他连接连接,因为它没有 yield
?
我们如何在不阻塞其他连接的情况下将其流式传输到多个客户端?
我考虑过添加更多工作人员,这似乎可以解决问题,但我不明白的是,这不是一个可扩展性很强的解决方案。因为每个客户都是自己的工人,如果您有数千个甚至只有 10 个客户,那么每个客户 10 个工人 1。
那么 Binance 如何进行他们的 websocket 流式传输?或者推特流端点如何工作?
它如何能够为多个并发客户端提供无限流?
因为最终这就是我想要做的
也许是这样的?
import aiohttp
import asyncio
loop = asyncio.get_event_loop()
async def main():
async with aiohttp.ClientSession() as session:
ws = await session.ws_connect("ws://192.168.86.31:8000/hello")
while True:
data = await ws.receive()
print(data)
multiple_coroutines = [main() for _ in range(10)]
loop.run_until_complete(asyncio.gather(*multiple_coroutines))
解决这个问题的方法是这样的。
我正在使用 sanic
框架
class Stream:
def __init__(self):
self._connected_clients = set()
async def __call__(self, *args, **kwargs):
await self.stream(*args, **kwargs)
async def stream(self, request, ws):
self._connected_clients.add(ws)
while True:
disconnected_clients = []
for client in self._connected_clients: # check for disconnected clients
if client.state == 3: # append to a list because error will be raised if removed from set while iterating over it
disconnected_clients.append(client)
for client in disconnected_clients: # remove disconnected clients
self._connected_clients.remove(client)
await asyncio.wait([client.send("Hello") for client in self._connected_clients]))
ws_routes.add_websocket_route(Stream(), "/stream")
- 跟踪每个
websocket
会话
- 附加到
list
或 set
- 检查无效的
websocket
会话并从您的 websocket
会话容器中删除
- 做一个
await asyncio.wait([ws_session.send() for ws_session [list of valid sessions]])
基本上是广播。
5.profit!
这基本上就是 pubsub 设计模式
最近我进入了 "crypto mania" 并开始在一些交易所围绕 API 编写自己的包装器。
Binance 特别有一个流式 websocket 端点。
您可以在其中通过 websocket 端点流式传输数据。 我想我会使用 sanic 自己尝试一下。
这是我的 websocket 路由
@ws_routes.websocket("/hello")
async def hello(request, ws):
while True:
await ws.send("hello")
现在我在 2 台不同的机器上有 2 个客户端连接到它
async def main():
async with aiohttp.ClientSession() as session:
ws = await session.ws_connect("ws://192.168.86.31:8000/hello")
while True:
data = await ws.receive()
print(data)
然而,只有一个客户端能够连接并接收来自服务器的发送数据。我假设由于 while
循环它阻塞并阻止其他连接连接,因为它没有 yield
?
我们如何在不阻塞其他连接的情况下将其流式传输到多个客户端?
我考虑过添加更多工作人员,这似乎可以解决问题,但我不明白的是,这不是一个可扩展性很强的解决方案。因为每个客户都是自己的工人,如果您有数千个甚至只有 10 个客户,那么每个客户 10 个工人 1。
那么 Binance 如何进行他们的 websocket 流式传输?或者推特流端点如何工作?
它如何能够为多个并发客户端提供无限流? 因为最终这就是我想要做的
也许是这样的?
import aiohttp
import asyncio
loop = asyncio.get_event_loop()
async def main():
async with aiohttp.ClientSession() as session:
ws = await session.ws_connect("ws://192.168.86.31:8000/hello")
while True:
data = await ws.receive()
print(data)
multiple_coroutines = [main() for _ in range(10)]
loop.run_until_complete(asyncio.gather(*multiple_coroutines))
解决这个问题的方法是这样的。
我正在使用 sanic
框架
class Stream:
def __init__(self):
self._connected_clients = set()
async def __call__(self, *args, **kwargs):
await self.stream(*args, **kwargs)
async def stream(self, request, ws):
self._connected_clients.add(ws)
while True:
disconnected_clients = []
for client in self._connected_clients: # check for disconnected clients
if client.state == 3: # append to a list because error will be raised if removed from set while iterating over it
disconnected_clients.append(client)
for client in disconnected_clients: # remove disconnected clients
self._connected_clients.remove(client)
await asyncio.wait([client.send("Hello") for client in self._connected_clients]))
ws_routes.add_websocket_route(Stream(), "/stream")
- 跟踪每个
websocket
会话 - 附加到
list
或set
- 检查无效的
websocket
会话并从您的websocket
会话容器中删除 - 做一个
await asyncio.wait([ws_session.send() for ws_session [list of valid sessions]])
基本上是广播。
5.profit!
这基本上就是 pubsub 设计模式