FastAPI WebSocket 复制
FastAPI WebSocket replication
我已经使用 FastAPI 实现了一个简单的 WebSocket 代理(使用 this example)
应用程序的目标是将它收到的所有消息传递给它的活动连接(代理)。
它只适用于单个实例,因为它在内存中保持活动的 WebSocket 连接。并且当有一个以上的实例时不共享内存。
我天真的方法是通过在某些共享存储 (Redis) 中保持活动连接来解决它。但我一直坚持腌制它。
这是完整的应用程序:
import pickle
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
import redis
app = FastAPI()
rds = redis.StrictRedis('localhost')
class ConnectionManager:
def __init__(self):
self.active_connections = defaultdict(dict)
async def connect(self, websocket: WebSocket, application: str, client_id: str):
await websocket.accept()
if application not in self.active_connections:
self.active_connections[application] = defaultdict(list)
self.active_connections[application][client_id].append(websocket)
#### this is my attempt to store connections ####
rds.set('connections', pickle.dumps(self.active_connections))
def disconnect(self, websocket: WebSocket, application: str, client_id: str):
self.active_connections[application][client_id].remove(websocket)
async def broadcast(self, message: dict, application: str, client_id: str):
for connection in self.active_connections[application][client_id]:
try:
await connection.send_json(message)
print(f"sent {message}")
except Exception as e:
pass
manager = ConnectionManager()
@app.websocket("/ws/channel/{application}/{client_id}/")
async def websocket_endpoint(websocket: WebSocket, application: str, client_id: str):
await manager.connect(websocket, application, client_id)
while True:
try:
data = await websocket.receive_json()
print(f"received: {data}")
await manager.broadcast(data, application, client_id)
except WebSocketDisconnect:
manager.disconnect(websocket, application, client_id)
except RuntimeError:
break
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8005)
然而,pickling websocket 连接不成功:
AttributeError: Can't pickle local object 'FastAPI.setup.<locals>.openapi'
跨应用程序实例存储 WebSocket 连接的正确方法是什么?
UPD 每个@AKX 回答的实际解决方案。
服务器的每个实例都订阅了 Redis pubsub,并尝试将接收到的消息发送给所有连接的客户端。
由于一个客户端不能连接到多个实例 - 每条消息只能传递给每个客户端一次
import json
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
import redis
app = FastAPI()
rds = redis.StrictRedis('localhost')
class ConnectionManager:
def __init__(self):
self.active_connections = defaultdict(dict)
async def connect(self, websocket: WebSocket, application: str, client_id: str):
await websocket.accept()
if application not in self.active_connections:
self.active_connections[application] = defaultdict(list)
self.active_connections[application][client_id].append(websocket)
def disconnect(self, websocket: WebSocket, application: str, client_id: str):
self.active_connections[application][client_id].remove(websocket)
async def broadcast(self, message: dict, application: str, client_id: str):
for connection in self.active_connections[application][client_id]:
try:
await connection.send_json(message)
print(f"sent {message}")
except Exception as e:
pass
async def consume(self):
print("started to consume")
sub = rds.pubsub()
sub.subscribe('channel')
while True:
await asyncio.sleep(0.01)
message = sub.get_message(ignore_subscribe_messages=True)
if message is not None and isinstance(message, dict):
msg = json.loads(message.get('data'))
await self.broadcast(msg['message'], msg['application'], msg['client_id'])
manager = ConnectionManager()
@app.on_event("startup")
async def subscribe():
asyncio.create_task(manager.consume())
@app.websocket("/ws/channel/{application}/{client_id}/")
async def websocket_endpoint(websocket: WebSocket, application: str, client_id: str):
await manager.connect(websocket, application, client_id)
while True:
try:
data = await websocket.receive_json()
print(f"received: {data}")
rds.publish(
'channel',
json.dumps({
'application': application,
'client_id': client_id,
'message': data
})
)
except WebSocketDisconnect:
manager.disconnect(websocket, application, client_id)
except RuntimeError:
break
if __name__ == '__main__': # pragma: no cover
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8005)
What is the proper way to have WebSocket connections stored across the application instances?
据我所知,没有让多个进程共享 websocket 连接的实用方法。正如您所注意到的,您不能 pickle 连接(尤其是因为您不能 pickle 表示网络连接的实际 OS-level 文件描述符)。您 可以 使用一些 POSIX 魔法将文件描述符发送到其他进程,但即便如此,您还需要确保进程知道 websocket 的状态并且不例如竞相发送或接收数据。
我可能会重新设计一些东西,让一个单一的进程来管理 websocket 连接,例如使用 Redis(因为您已经拥有它)pubsub or streams 与您拥有的其他多个进程进行通信。
我已经使用 FastAPI 实现了一个简单的 WebSocket 代理(使用 this example)
应用程序的目标是将它收到的所有消息传递给它的活动连接(代理)。
它只适用于单个实例,因为它在内存中保持活动的 WebSocket 连接。并且当有一个以上的实例时不共享内存。
我天真的方法是通过在某些共享存储 (Redis) 中保持活动连接来解决它。但我一直坚持腌制它。
这是完整的应用程序:
import pickle
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
import redis
app = FastAPI()
rds = redis.StrictRedis('localhost')
class ConnectionManager:
def __init__(self):
self.active_connections = defaultdict(dict)
async def connect(self, websocket: WebSocket, application: str, client_id: str):
await websocket.accept()
if application not in self.active_connections:
self.active_connections[application] = defaultdict(list)
self.active_connections[application][client_id].append(websocket)
#### this is my attempt to store connections ####
rds.set('connections', pickle.dumps(self.active_connections))
def disconnect(self, websocket: WebSocket, application: str, client_id: str):
self.active_connections[application][client_id].remove(websocket)
async def broadcast(self, message: dict, application: str, client_id: str):
for connection in self.active_connections[application][client_id]:
try:
await connection.send_json(message)
print(f"sent {message}")
except Exception as e:
pass
manager = ConnectionManager()
@app.websocket("/ws/channel/{application}/{client_id}/")
async def websocket_endpoint(websocket: WebSocket, application: str, client_id: str):
await manager.connect(websocket, application, client_id)
while True:
try:
data = await websocket.receive_json()
print(f"received: {data}")
await manager.broadcast(data, application, client_id)
except WebSocketDisconnect:
manager.disconnect(websocket, application, client_id)
except RuntimeError:
break
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8005)
然而,pickling websocket 连接不成功:
AttributeError: Can't pickle local object 'FastAPI.setup.<locals>.openapi'
跨应用程序实例存储 WebSocket 连接的正确方法是什么?
UPD 每个@AKX 回答的实际解决方案。
服务器的每个实例都订阅了 Redis pubsub,并尝试将接收到的消息发送给所有连接的客户端。
由于一个客户端不能连接到多个实例 - 每条消息只能传递给每个客户端一次
import json
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from collections import defaultdict
import redis
app = FastAPI()
rds = redis.StrictRedis('localhost')
class ConnectionManager:
def __init__(self):
self.active_connections = defaultdict(dict)
async def connect(self, websocket: WebSocket, application: str, client_id: str):
await websocket.accept()
if application not in self.active_connections:
self.active_connections[application] = defaultdict(list)
self.active_connections[application][client_id].append(websocket)
def disconnect(self, websocket: WebSocket, application: str, client_id: str):
self.active_connections[application][client_id].remove(websocket)
async def broadcast(self, message: dict, application: str, client_id: str):
for connection in self.active_connections[application][client_id]:
try:
await connection.send_json(message)
print(f"sent {message}")
except Exception as e:
pass
async def consume(self):
print("started to consume")
sub = rds.pubsub()
sub.subscribe('channel')
while True:
await asyncio.sleep(0.01)
message = sub.get_message(ignore_subscribe_messages=True)
if message is not None and isinstance(message, dict):
msg = json.loads(message.get('data'))
await self.broadcast(msg['message'], msg['application'], msg['client_id'])
manager = ConnectionManager()
@app.on_event("startup")
async def subscribe():
asyncio.create_task(manager.consume())
@app.websocket("/ws/channel/{application}/{client_id}/")
async def websocket_endpoint(websocket: WebSocket, application: str, client_id: str):
await manager.connect(websocket, application, client_id)
while True:
try:
data = await websocket.receive_json()
print(f"received: {data}")
rds.publish(
'channel',
json.dumps({
'application': application,
'client_id': client_id,
'message': data
})
)
except WebSocketDisconnect:
manager.disconnect(websocket, application, client_id)
except RuntimeError:
break
if __name__ == '__main__': # pragma: no cover
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8005)
What is the proper way to have WebSocket connections stored across the application instances?
据我所知,没有让多个进程共享 websocket 连接的实用方法。正如您所注意到的,您不能 pickle 连接(尤其是因为您不能 pickle 表示网络连接的实际 OS-level 文件描述符)。您 可以 使用一些 POSIX 魔法将文件描述符发送到其他进程,但即便如此,您还需要确保进程知道 websocket 的状态并且不例如竞相发送或接收数据。
我可能会重新设计一些东西,让一个单一的进程来管理 websocket 连接,例如使用 Redis(因为您已经拥有它)pubsub or streams 与您拥有的其他多个进程进行通信。