Sanic Web 服务器:Websocket 处理程序在 return 上关闭套接字;循环中断其他请求处理程序
Sanic Webserver: Websocket handler closes socket on return; looping breaks other request handlers
场景:我有一个服务于一个简单网站的 sanic 网络服务器。网站基本就是一个大数据table in html 有vue模板支持。由于 table 条目每隔几分钟更改一次,因此数据通过 websocket 在更改时传送。同时约有2000个用户。我试图实现一个 pub/sub 架构。
问题:一旦我的 sanic 处理程序 returns,我的 websockets 就关闭了。我可以在里面有一个循环来保持处理程序打开。但是保持 2000 个处理程序打开听起来像是一个坏主意......而且打开的处理程序表现得很奇怪。一个线程或一个小线程池应该可以完成这项工作。也许我弄错了 sanic 文档,需要设计建议。
我尝试过的事情:
- 将超时设置增加到足够高
- 在 sanic 中尝试各种其他 websocket 设置
- 让我的客户端 js return false onmessage (Javascript websockets closing immediately after opening)
- 传递后将 ws 引用设置为 null
Sanic Webserver 的索引:
@app.route('/')
async def serve_index(request):
return await file(os.path.join(os.path.dirname(__file__), 'index.html'))
Index.html 的 JS:
var app = new Vue({
el: '#app',
data() {
manydata0: 0,
manydata1: 0,
ws: null,
}
},
methods: {
update: function (json_data) {
json = JSON.parse(json_data);
this.manydata0 = json['data0'];
this.manydata1 = json['data1'];
}
},
created: function () {
this.ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/reload');
messages = document.createElement('ul');
this.ws.onmessage = function (event) {
console.log("new data")
app.update(event.data);
return false;
};
document.body.appendChild(messages);
this.ws.onclose = function (event) {
console.log("closed :(")
};
Sanic Webserver 的 Websocket 处理程序(第 1 版,套接字立即终止):
@app.websocket('/reload')
async def feed(request, ws):
#time.sleep(42) # this causes the websocket to be created and closed on client side 42 seconds after my request
await ws.send(Path(json).read_text()) # serve initial data
connected_clients.append(ws) # subscribe to websocket list. another thread will read list entries and serve them updates
Sanic Webservers 的 Websocket 处理程序(第 2 版,处理程序阻止其他请求处理程序)
@app.websocket('/reload')
async def feed(request, ws):
mod_time = 0
while True:
try:
stat = os.stat(json)
if mod_time != stat.st_mtime:
await ws.send(Path(json).read_text())
except Exception as e:
print("Exception while checking file: ", e)
# this stops the server to handle other @app.routes like css, fonts, favicon
Sanic Webservers 的 Websocket 处理程序(第 3 版,不必要的 recv())
@app.websocket('/reload')
async def feed(request, ws):
mod_time = 0
while True:
try:
stat = os.stat(json)
if mod_time != stat.st_mtime:
await ws.send(Path(json).read_text())
await recv() # if the client sends from time to time all is fine
except Exception as e:
print("Exception while checking file: ", e)
最后两个代码片段差别不大。我添加了一个 ws.recv() 并从客户端发送了一些合适的东西(例如,在一个时间间隔内),然后一切正常。然后 css,发送字体和图标。但这不是故意的,不是吗?这应该不能很好地扩展,对吧?
总而言之,这对我来说意义不大。我误会了什么?
这里是 Sanic core-devs 之一。
首先,对于pubsub类型架构的示例,here is a gist我准备了。我认为这可能会有所帮助。
我的基本想法是创建一个单独的 Feed
对象,它在自己的任务中循环查找事件。在我的例子中,它是从 pubsub 接收信息。在您的情况下,它应该检查 JSON 文档上的时间。
然后,当 Feed.receiver
触发事件时,它会向所有正在侦听的客户端发出 ping 命令。
在 websocket
处理程序本身中,您想保持打开状态。如果不这样做,则连接将关闭。如果你不关心从客户端接收信息,你不需要使用await recv()
。
所以,在你的情况下,使用 SUPER 简单的逻辑,我会做类似下面的事情。
这是未经测试的代码,可能需要一些调整
import os
import random
import string
from functools import partial
from pathlib import Path
from sanic import Sanic
import asyncio
import websockets
from dataclasses import dataclass, field
from typing import Optional, Set
app = Sanic(__name__)
FILE = "/tmp/foobar"
TIMEOUT = 10
INTERVAL = 20
def generate_code(length=12, include_punctuation=False):
characters = string.ascii_letters + string.digits
if include_punctuation:
characters += string.punctuation
return "".join(random.choice(characters) for x in range(length))
@dataclass
class Client:
interface: websockets.server.WebSocketServerProtocol = field(repr=False)
sid: str = field(default_factory=partial(generate_code, 36))
def __hash__(self):
return hash(str(self))
async def keep_alive(self) -> None:
while True:
try:
try:
pong_waiter = await self.interface.ping()
await asyncio.wait_for(pong_waiter, timeout=TIMEOUT)
except asyncio.TimeoutError:
print("NO PONG!!")
await self.feed.unregister(self)
else:
print(f"ping: {self.sid} on <{self.feed.name}>")
await asyncio.sleep(INTERVAL)
except websockets.exceptions.ConnectionClosed:
print(f"broken connection: {self.sid} on <{self.feed.name}>")
await self.feed.unregister(self)
break
async def shutdown(self) -> None:
self.interface.close()
async def run(self) -> None:
try:
self.feed.app.add_task(self.keep_alive())
while True:
pass
except websockets.exceptions.ConnectionClosed:
print("connection closed")
finally:
await self.feed.unregister(self)
class Feed:
app: Sanic
clients: Set[Client]
cached = None
def __init__(self, app: Sanic):
self.clients = set()
self.app = app
@classmethod
async def get(cls, app: Sanic):
is_existing = False
if cls.cached:
is_existing = True
feed = cls.cached
else:
feed = cls(app)
cls.cached = feed
if not is_existing:
feed.app.add_task(feed.receiver())
return feed, is_existing
async def receiver(self) -> None:
print("Feed receiver started")
mod_time = 0
while True:
try:
stat = os.stat(FILE)
print(f"times: {mod_time} | {stat.st_mtime}")
if mod_time != stat.st_mtime:
content = self.get_file_contents()
for client in self.clients:
try:
print(f"\tSending to {client.sid}")
await client.interface.send(content)
except websockets.exceptions.ConnectionClosed:
print(f"ConnectionClosed. Client {client.sid}")
except Exception as e:
print("Exception while checking file: ", e)
async def register(
self, websocket: websockets.server.WebSocketServerProtocol
) -> Optional[Client]:
client = Client(interface=websocket)
print(f">>> register {client}")
client.feed = self
self.clients.add(client)
# Send initial content
content = self.get_file_contents()
client.interface.send(content)
print(f"\nAll clients\n{self.clients}\n\n")
return client
async def unregister(self, client: Client) -> None:
print(f">>> unregister {client} on <{self.name}>")
if client in self.clients:
await client.shutdown()
self.clients.remove(client)
print(f"\nAll remaining clients\n{self.clients}\n\n")
def get_file_contents(self):
return Path(FILE).read_text()
@app.websocket("/reload")
async def feed(request, ws):
feed, is_existing = await Feed.get(app)
client = await feed.register(ws)
await client.run()
if __name__ == "__main__":
app.run(debug=True, port=7777)
场景:我有一个服务于一个简单网站的 sanic 网络服务器。网站基本就是一个大数据table in html 有vue模板支持。由于 table 条目每隔几分钟更改一次,因此数据通过 websocket 在更改时传送。同时约有2000个用户。我试图实现一个 pub/sub 架构。
问题:一旦我的 sanic 处理程序 returns,我的 websockets 就关闭了。我可以在里面有一个循环来保持处理程序打开。但是保持 2000 个处理程序打开听起来像是一个坏主意......而且打开的处理程序表现得很奇怪。一个线程或一个小线程池应该可以完成这项工作。也许我弄错了 sanic 文档,需要设计建议。
我尝试过的事情: - 将超时设置增加到足够高 - 在 sanic 中尝试各种其他 websocket 设置 - 让我的客户端 js return false onmessage (Javascript websockets closing immediately after opening) - 传递后将 ws 引用设置为 null
Sanic Webserver 的索引:
@app.route('/')
async def serve_index(request):
return await file(os.path.join(os.path.dirname(__file__), 'index.html'))
Index.html 的 JS:
var app = new Vue({
el: '#app',
data() {
manydata0: 0,
manydata1: 0,
ws: null,
}
},
methods: {
update: function (json_data) {
json = JSON.parse(json_data);
this.manydata0 = json['data0'];
this.manydata1 = json['data1'];
}
},
created: function () {
this.ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/reload');
messages = document.createElement('ul');
this.ws.onmessage = function (event) {
console.log("new data")
app.update(event.data);
return false;
};
document.body.appendChild(messages);
this.ws.onclose = function (event) {
console.log("closed :(")
};
Sanic Webserver 的 Websocket 处理程序(第 1 版,套接字立即终止):
@app.websocket('/reload')
async def feed(request, ws):
#time.sleep(42) # this causes the websocket to be created and closed on client side 42 seconds after my request
await ws.send(Path(json).read_text()) # serve initial data
connected_clients.append(ws) # subscribe to websocket list. another thread will read list entries and serve them updates
Sanic Webservers 的 Websocket 处理程序(第 2 版,处理程序阻止其他请求处理程序)
@app.websocket('/reload')
async def feed(request, ws):
mod_time = 0
while True:
try:
stat = os.stat(json)
if mod_time != stat.st_mtime:
await ws.send(Path(json).read_text())
except Exception as e:
print("Exception while checking file: ", e)
# this stops the server to handle other @app.routes like css, fonts, favicon
Sanic Webservers 的 Websocket 处理程序(第 3 版,不必要的 recv())
@app.websocket('/reload')
async def feed(request, ws):
mod_time = 0
while True:
try:
stat = os.stat(json)
if mod_time != stat.st_mtime:
await ws.send(Path(json).read_text())
await recv() # if the client sends from time to time all is fine
except Exception as e:
print("Exception while checking file: ", e)
最后两个代码片段差别不大。我添加了一个 ws.recv() 并从客户端发送了一些合适的东西(例如,在一个时间间隔内),然后一切正常。然后 css,发送字体和图标。但这不是故意的,不是吗?这应该不能很好地扩展,对吧?
总而言之,这对我来说意义不大。我误会了什么?
这里是 Sanic core-devs 之一。
首先,对于pubsub类型架构的示例,here is a gist我准备了。我认为这可能会有所帮助。
我的基本想法是创建一个单独的 Feed
对象,它在自己的任务中循环查找事件。在我的例子中,它是从 pubsub 接收信息。在您的情况下,它应该检查 JSON 文档上的时间。
然后,当 Feed.receiver
触发事件时,它会向所有正在侦听的客户端发出 ping 命令。
在 websocket
处理程序本身中,您想保持打开状态。如果不这样做,则连接将关闭。如果你不关心从客户端接收信息,你不需要使用await recv()
。
所以,在你的情况下,使用 SUPER 简单的逻辑,我会做类似下面的事情。
这是未经测试的代码,可能需要一些调整
import os
import random
import string
from functools import partial
from pathlib import Path
from sanic import Sanic
import asyncio
import websockets
from dataclasses import dataclass, field
from typing import Optional, Set
app = Sanic(__name__)
FILE = "/tmp/foobar"
TIMEOUT = 10
INTERVAL = 20
def generate_code(length=12, include_punctuation=False):
characters = string.ascii_letters + string.digits
if include_punctuation:
characters += string.punctuation
return "".join(random.choice(characters) for x in range(length))
@dataclass
class Client:
interface: websockets.server.WebSocketServerProtocol = field(repr=False)
sid: str = field(default_factory=partial(generate_code, 36))
def __hash__(self):
return hash(str(self))
async def keep_alive(self) -> None:
while True:
try:
try:
pong_waiter = await self.interface.ping()
await asyncio.wait_for(pong_waiter, timeout=TIMEOUT)
except asyncio.TimeoutError:
print("NO PONG!!")
await self.feed.unregister(self)
else:
print(f"ping: {self.sid} on <{self.feed.name}>")
await asyncio.sleep(INTERVAL)
except websockets.exceptions.ConnectionClosed:
print(f"broken connection: {self.sid} on <{self.feed.name}>")
await self.feed.unregister(self)
break
async def shutdown(self) -> None:
self.interface.close()
async def run(self) -> None:
try:
self.feed.app.add_task(self.keep_alive())
while True:
pass
except websockets.exceptions.ConnectionClosed:
print("connection closed")
finally:
await self.feed.unregister(self)
class Feed:
app: Sanic
clients: Set[Client]
cached = None
def __init__(self, app: Sanic):
self.clients = set()
self.app = app
@classmethod
async def get(cls, app: Sanic):
is_existing = False
if cls.cached:
is_existing = True
feed = cls.cached
else:
feed = cls(app)
cls.cached = feed
if not is_existing:
feed.app.add_task(feed.receiver())
return feed, is_existing
async def receiver(self) -> None:
print("Feed receiver started")
mod_time = 0
while True:
try:
stat = os.stat(FILE)
print(f"times: {mod_time} | {stat.st_mtime}")
if mod_time != stat.st_mtime:
content = self.get_file_contents()
for client in self.clients:
try:
print(f"\tSending to {client.sid}")
await client.interface.send(content)
except websockets.exceptions.ConnectionClosed:
print(f"ConnectionClosed. Client {client.sid}")
except Exception as e:
print("Exception while checking file: ", e)
async def register(
self, websocket: websockets.server.WebSocketServerProtocol
) -> Optional[Client]:
client = Client(interface=websocket)
print(f">>> register {client}")
client.feed = self
self.clients.add(client)
# Send initial content
content = self.get_file_contents()
client.interface.send(content)
print(f"\nAll clients\n{self.clients}\n\n")
return client
async def unregister(self, client: Client) -> None:
print(f">>> unregister {client} on <{self.name}>")
if client in self.clients:
await client.shutdown()
self.clients.remove(client)
print(f"\nAll remaining clients\n{self.clients}\n\n")
def get_file_contents(self):
return Path(FILE).read_text()
@app.websocket("/reload")
async def feed(request, ws):
feed, is_existing = await Feed.get(app)
client = await feed.register(ws)
await client.run()
if __name__ == "__main__":
app.run(debug=True, port=7777)