如何将 socket.io 挂载到 fastapi 应用程序并向所有连接的客户端发送广播
How to mount socket.io to fastapi app and send broadcast to all connected clients
我尝试创建使用 websockets 并可以向所有连接的客户端广播消息的 fastapi 应用程序。我发现使用 websockets 是不可能的,但找到了出色的库 - socket.io。但是我不确定如何使用它并将它与我现有的 fastapi 应用程序集成。
# server.py
from typing import Any
import uvicorn
from fastapi import FastAPI
import socketio
sio: Any = socketio.AsyncServer(async_mode="asgi")
socket_app = socketio.ASGIApp(sio)
app = FastAPI()
@app.get("/test")
async def test():
print("test")
return "WORKS"
app.mount("/", socket_app) # Here we mount socket app to main fastapi app
@sio.on("connect")
async def connect(sid, env):
print("on connect")
@sio.on("direct")
async def direct(sid, msg):
print(f"direct {msg}")
await sio.emit("event_name", msg, room=sid) # we can send message to specific sid
@sio.on("broadcast")
async def broadcast(sid, msg):
print(f"broadcast {msg}")
await sio.emit("event_name", msg) # or send to everyone
@sio.on("disconnect")
async def disconnect(sid):
print("on disconnect")
if __name__ == "__main__":
kwargs = {"host": "0.0.0.0", "port": 5000}
kwargs.update({"debug": True, "reload": True})
uvicorn.run("server:app", **kwargs)
# client.py
import requests
import socketio
r = requests.get("http://127.0.0.1:5000/test") # server prints "test"
cl = socketio.Client()
cl2 = socketio.Client()
@cl.on("event_name")
def foo(data):
print(f"client 1 {data}")
@cl2.on("event_name")
def foo2(data):
print(f"client 2 {data}")
cl.connect("http://127.0.0.1:5000/") # server prints "on connect"
cl2.connect("http://127.0.0.1:5000/")
cl.emit("direct", "msg_1") # prints client 1 msg_1
cl2.emit("broadcast", "msg_2") # prints client 2 msg_2 and client 1 msg_2
最后安装适当的依赖项:
# server.py
pip install python-socketio uvicorn fastapi
# client.py
pip install requests websocket-client
我尝试创建使用 websockets 并可以向所有连接的客户端广播消息的 fastapi 应用程序。我发现使用 websockets 是不可能的,但找到了出色的库 - socket.io。但是我不确定如何使用它并将它与我现有的 fastapi 应用程序集成。
# server.py
from typing import Any
import uvicorn
from fastapi import FastAPI
import socketio
sio: Any = socketio.AsyncServer(async_mode="asgi")
socket_app = socketio.ASGIApp(sio)
app = FastAPI()
@app.get("/test")
async def test():
print("test")
return "WORKS"
app.mount("/", socket_app) # Here we mount socket app to main fastapi app
@sio.on("connect")
async def connect(sid, env):
print("on connect")
@sio.on("direct")
async def direct(sid, msg):
print(f"direct {msg}")
await sio.emit("event_name", msg, room=sid) # we can send message to specific sid
@sio.on("broadcast")
async def broadcast(sid, msg):
print(f"broadcast {msg}")
await sio.emit("event_name", msg) # or send to everyone
@sio.on("disconnect")
async def disconnect(sid):
print("on disconnect")
if __name__ == "__main__":
kwargs = {"host": "0.0.0.0", "port": 5000}
kwargs.update({"debug": True, "reload": True})
uvicorn.run("server:app", **kwargs)
# client.py
import requests
import socketio
r = requests.get("http://127.0.0.1:5000/test") # server prints "test"
cl = socketio.Client()
cl2 = socketio.Client()
@cl.on("event_name")
def foo(data):
print(f"client 1 {data}")
@cl2.on("event_name")
def foo2(data):
print(f"client 2 {data}")
cl.connect("http://127.0.0.1:5000/") # server prints "on connect"
cl2.connect("http://127.0.0.1:5000/")
cl.emit("direct", "msg_1") # prints client 1 msg_1
cl2.emit("broadcast", "msg_2") # prints client 2 msg_2 and client 1 msg_2
最后安装适当的依赖项:
# server.py
pip install python-socketio uvicorn fastapi
# client.py
pip install requests websocket-client