如何在 Python 网络应用程序中为状态更改通知生成服务器发送的事件?
How to generate server-sent events for status change notifications in a Python web app?
我有一个用 CherryPy 编写的网络应用程序:用户上传一个文件,然后开始一些冗长的操作,经过几个阶段。我希望将这些阶段的通知推送到所有连接的客户端。但是我不知道如何在进程之间进行通信。我想我必须在一个单独的进程中启动冗长的操作,但我不知道如何将 "advanced to stage N" 消息传递给 "server-sending function".
从概念上讲,它应该是这样的:
SSEtest.py:
from pathlib import Path
from time import sleep
import cherrypy
def lengthy_operation(name, stream):
for stage in range(10):
print(f'stage {stage}... ', end='')
sleep(2)
print('done')
print('finished')
class SSETest():
@cherrypy.expose
def index(self):
return Path('SSEtest.html').read_text()
@cherrypy.expose
def upload(self, file):
name = file.filename.encode('iso-8859-1').decode('utf-8')
lengthy_operation(name, file.file)
return 'OK'
@cherrypy.expose
def stage(self):
cherrypy.response.headers['Content-Type'] = 'text/event-stream;charset=utf-8'
def lengthy_operation():
for stage in range(5):
yield f'data: stage {stage}... \n\n'
sleep(2)
yield 'data: done\n\n'
yield 'data: finished\n\n'
return lengthy_operation()
stage._cp_config = {'response.stream': True, 'tools.encode.encoding': 'utf-8'}
cherrypy.quickstart(SSETest())
SSEtest.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>SSE Test</title>
</head>
<body>
<h1>SSE Test</h1>
<div>
<form id="load_file_form" action="" enctype="multipart/form-data">
<label for="load_file">Load a file: </label>
<input type="file" id="load_file" name="load_file">
<progress max="100" value="0" id="progress_bar"></progress>
</form>
</div>
<div id="status_messages">
<h3>Stages:</h3>
</div>
<script>
const load_file = document.getElementById('load_file');
const progress_bar = document.getElementById('progress_bar');
function update_progress_bar(event) {
if (event.lengthComputable) {
progress_bar.value = Math.round((event.loaded/event.total)*100);
}
}
load_file.onchange = function (event) {
let the_file = load_file.files[0];
let formData = new FormData();
let connection = new XMLHttpRequest();
formData.append('file', the_file, the_file.name);
connection.open('POST', 'upload', true);
connection.upload.onprogress = update_progress_bar;
connection.onload = function (event) {
if (connection.status != 200) {
alert('Error! ' + event);
}
};
connection.send(formData);
};
const status_messages = document.getElementById("status_messages");
const sse = new EventSource("stage");
sse.onopen = function (event) {
let new_message = document.createElement("p");
new_message.innerHTML = "Connection established: " + event.type;
status_messages.appendChild(new_message);
};
sse.onmessage = function (event) {
let new_message = document.createElement("p");
new_message.innerHTML = event.data;
status_messages.appendChild(new_message);
};
sse.onerror = function(event) {
let new_message = document.createElement("p");
if (event.readyState == EventSource.CLOSED) {
new_message.innerHTML = "Connections closed";
} else {
new_message.innerHTML = "Error: " + event.type;
}
status_messages.appendChild(new_message);
};
</script>
</body>
</html>
我只需要 lengthy_operation()
在上传文件时调用一次。并将其生成的消息发送给所有客户端。现在它可以与本地功能一起使用,这不是我想要的。如何使用外部函数并将其消息传递给 stage()
方法?
I want notifications for these stages to be pushed to all the connected clients.
我怀疑您最终会想要更多的控制权,但我会按原样回答您的问题。稍后,您可能希望以下面的示例为基础,并根据用户的会话、特定的开始时间戳或其他一些相关概念来过滤广播的通知。
每个 "connected client" 都有效地挂在对 /stage
的长 运行 请求上,服务器将使用该请求在 EventSource
上流式传输 events to the client. In your example, each client will begin that request immediately and leave it open until the server terminates the stream. You can also close the stream from the client using close()
。
基本解决方案
您询问了如何让 /stage
处理程序将其事件广播或镜像到所有当前连接的客户端。有许多方法可以完成此操作,但简而言之,您希望 lengthy_operation
函数将 post 事件发送给所有 /stage
处理程序读取器,或者发送到持久共享位置,所有 /stage
处理程序读取。我将展示一种封装上述第一个想法的方法。
考虑序列化为 data: <some message>
:
的通用流事件 class
class StreamEvent:
def __init__(self, message: str) -> bytes:
self.message = message
def serialize(self) -> str:
return f'data: {self.message}\n\n'.encode('utf-8')
以及文件相关流事件的更具体派生案例:
class FileStreamEvent(StreamEvent):
def __init__(self, message: str, name: str):
super().__init__(message)
self.name = name
def serialize(self) -> bytes:
return f'data: file: {self.name}: {self.message}\n\n'.encode('utf-8')
您可以创建一个非常原始的 publish/subscribe 类型的容器,然后 /stage
可以订阅侦听器并且 lengthy_operation()
可以向所有侦听器发布 StreamEvent
实例:
class StreamSource:
def __init__(self):
self.listeners: List[Queue] = []
def put(self, event: StreamEvent):
for listener in self.listeners:
listener.put_nowait(event)
def get(self):
listener = Queue()
self.listeners.append(listener)
try:
while True:
event = listener.get()
yield event.serialize()
finally:
self.listeners.remove(listener)
在 StreamSource.get()
中,您可能想要创建一个最终案例(例如检查 "close" 或 "finish" 事件)以退出通用 while True
并且您可能想在阻塞 Queue.get()
调用上设置超时。但是为了这个例子,我保留了一切基本内容。
现在,lengthy_operation()
只需要引用 StreamSource
:
def lengthy_operation(events: StreamSource, name: str, stream: BinaryIO):
for stage in range(10):
events.put(FileStreamEvent(f'stage {stage}: begin', name))
sleep(2)
events.put(FileStreamEvent(f'stage {stage}: end', name))
events.put(FileStreamEvent('finished', name))
SSETest
然后可以为每个 lengthy_operation()
调用提供一个 StreamSource
的共享实例,并且 SSETest.stage()
可以使用 StreamSource.get()
在这个共享上注册一个监听器实例:
class SSETest:
_stream_source: StreamSource = StreamSource()
@cherrypy.expose
def index(self):
return Path('SSETest.html').read_text()
@cherrypy.expose
def upload(self, file):
name = file.filename.encode('iso-8859-1').decode('utf-8')
lengthy_operation(self._stream_source, name, file.file)
return 'OK'
@cherrypy.expose
def stage(self):
cherrypy.response.headers['Cache-Control'] = 'no-cache'
cherrypy.response.headers['Content-Type'] = 'text/event-stream'
def stream():
yield from self._stream_source.get()
return stream()
stage._cp_config = {'response.stream': True}
这是一个完整的[1]示例,说明如何解决您眼前的问题,但您很可能希望在接近最终用户体验时对其进行调整记住了。
[1]:为了便于阅读,我省略了导入,所以它们是:
from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from time import sleep
from typing import BinaryIO, List
import cherrypy
后续退出条件
由于您正在使用 cherrypy.quickstart()
,在上面的最小可行解决方案中,您将不得不强制退出 SSETest
服务,因为我没有为您假设任何优雅的 "stop" 行为。第一个解决方案明确指出了这一点,但为了可读性没有提供解决方案。
让我们看看提供一些初始优雅 "stop" 条件的几种方法:
为StreamSource
添加停止条件
首先,至少要给StreamSource
加上一个合理的停止条件。例如,添加一个 running
属性,允许 StreamSource.get()
while
循环正常退出。接下来,设置合理的 Queue.get()
超时,以便循环可以在处理消息之间定期测试此 running
属性。接下来,确保至少有一些相关的 CherryPy 总线消息触发此停止行为。下面,我将所有这些行为都整合到 StreamSource
class 中,但您也可以注册一个单独的应用程序级 CherryPy 插件来处理对 StreamSource.stop()
的调用,而不是使 StreamSource
成为一个插入。当我添加一个单独的信号处理程序时,我将演示它的样子。
class StreamSource(plugins.SimplePlugin):
def __init__(self, bus: wspbus.Bus):
super().__init__(bus)
self.subscribe()
self.running = True
self.listeners: List[Queue] = []
def graceful(self):
self.stop()
def exit(self):
self.stop()
def stop(self):
self.running = False
def put(self, event: StreamEvent):
for listener in self.listeners:
listener.put_nowait(event)
def get(self):
listener = Queue()
self.listeners.append(listener)
try:
while self.running:
try:
event = listener.get(timeout=1.0)
yield event.serialize()
except Empty:
pass
finally:
self.listeners.remove(listener)
现在,SSETest
需要用总线值初始化 StreamSource
,因为 class 现在是 SimplePlugin
:
_stream_source: StreamSource = StreamSource(cherrypy.engine)
您会发现此解决方案使您更接近您可能想要的用户体验。发出键盘中断,CherryPy 将开始停止系统,但第一个优雅的键盘中断不会发布 stop
消息,因此您需要发送第二个键盘中断。
添加一个 SIGINT 处理程序来捕获键盘中断
由于 cherrypy.quickstart
使用信号处理程序的方式,您可能需要注册一个 SIGINT
handler as a CherryPy-compatible SignalHandler
插件以在第一次键盘中断时优雅地停止 StreamSource
。
这是一个例子:
class SignalHandler(plugins.SignalHandler):
def __init__(self, bus: wspbus.Bus, sse):
super().__init__(bus)
self.handlers = {
'SIGINT': self.handle_SIGINT,
}
self.sse = sse
def handle_SIGINT(self):
self.sse.stop()
raise KeyboardInterrupt()
请注意,在这种情况下,我将演示一个通用的应用程序级处理程序,然后您可以通过如下更改启动 cherrypy.quickstart()
逻辑来配置和初始化它:
sse = SSETest()
SignalHandler(cherrypy.engine, sse).subscribe()
cherrypy.quickstart(sse)
对于此示例,我公开了一个通用应用程序 SSETest.stop
方法来封装所需的行为:
class SSETest:
_stream_source: StreamSource = StreamSource(cherrypy.engine)
def stop(self):
self._stream_source.stop()
总结分析
我 不是 CherryPy 用户,我昨天才开始第一次看它只是为了回答你的问题,所以我会离开 "CherryPy best practices"你的自由裁量权。
实际上,您的问题是以下 Python 问题的非常普遍的组合:
- 如何实现简单的 publish/subscribe 模式? (回答
Queue
);
- 如何为订阅者循环创建退出条件? (用
Queue.get()
的 timeout
参数和 running
属性回答)
- 如何用键盘中断影响退出条件? (使用特定于 CherryPy 的信号处理程序进行了回答,但这只是建立在 Python 内置
signal
模块中的概念之上)
您可以通过多种方式解决所有这些问题,有些更倾向于通用 "Pythonic" 解决方案(我的偏好是有意义的),而其他人则利用以 CherryPy 为中心的概念(这在以下情况下有意义)你想增加 CherryPy 的行为而不是重写或破坏它)。
例如,您可以使用 CherryPy 总线消息来传送流消息,但对我来说,这会使您的应用程序逻辑与 CherryPy 特定的功能纠缠得太多,所以我可能会找到一个中间地带来处理您的从我的 StreamSource
示例如何使用标准 Python Queue
模式中可以看出,应用程序的一般特性(以免将自己束缚于 CherryPy)。您可以选择制作 StreamSource
一个插件,以便它可以直接响应某些 CherryPy 总线消息(如我上面所示),或者您可以有一个单独的插件,它知道调用相关的特定于应用程序的域,例如StreamSource.stop()
(类似于我用 SignalHandler
显示的内容)。
最后,你所有的问题都很好,但它们之前都可能作为通用 Python 问题在 SO 上得到了回答,所以当我将这里的答案与你的 CherryPy 问题联系起来时 space我还想帮助您(和未来的读者)了解如何在 CherryPy 之外更抽象地思考这些特定问题。
我有一个用 CherryPy 编写的网络应用程序:用户上传一个文件,然后开始一些冗长的操作,经过几个阶段。我希望将这些阶段的通知推送到所有连接的客户端。但是我不知道如何在进程之间进行通信。我想我必须在一个单独的进程中启动冗长的操作,但我不知道如何将 "advanced to stage N" 消息传递给 "server-sending function".
从概念上讲,它应该是这样的:
SSEtest.py:
from pathlib import Path
from time import sleep
import cherrypy
def lengthy_operation(name, stream):
for stage in range(10):
print(f'stage {stage}... ', end='')
sleep(2)
print('done')
print('finished')
class SSETest():
@cherrypy.expose
def index(self):
return Path('SSEtest.html').read_text()
@cherrypy.expose
def upload(self, file):
name = file.filename.encode('iso-8859-1').decode('utf-8')
lengthy_operation(name, file.file)
return 'OK'
@cherrypy.expose
def stage(self):
cherrypy.response.headers['Content-Type'] = 'text/event-stream;charset=utf-8'
def lengthy_operation():
for stage in range(5):
yield f'data: stage {stage}... \n\n'
sleep(2)
yield 'data: done\n\n'
yield 'data: finished\n\n'
return lengthy_operation()
stage._cp_config = {'response.stream': True, 'tools.encode.encoding': 'utf-8'}
cherrypy.quickstart(SSETest())
SSEtest.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>SSE Test</title>
</head>
<body>
<h1>SSE Test</h1>
<div>
<form id="load_file_form" action="" enctype="multipart/form-data">
<label for="load_file">Load a file: </label>
<input type="file" id="load_file" name="load_file">
<progress max="100" value="0" id="progress_bar"></progress>
</form>
</div>
<div id="status_messages">
<h3>Stages:</h3>
</div>
<script>
const load_file = document.getElementById('load_file');
const progress_bar = document.getElementById('progress_bar');
function update_progress_bar(event) {
if (event.lengthComputable) {
progress_bar.value = Math.round((event.loaded/event.total)*100);
}
}
load_file.onchange = function (event) {
let the_file = load_file.files[0];
let formData = new FormData();
let connection = new XMLHttpRequest();
formData.append('file', the_file, the_file.name);
connection.open('POST', 'upload', true);
connection.upload.onprogress = update_progress_bar;
connection.onload = function (event) {
if (connection.status != 200) {
alert('Error! ' + event);
}
};
connection.send(formData);
};
const status_messages = document.getElementById("status_messages");
const sse = new EventSource("stage");
sse.onopen = function (event) {
let new_message = document.createElement("p");
new_message.innerHTML = "Connection established: " + event.type;
status_messages.appendChild(new_message);
};
sse.onmessage = function (event) {
let new_message = document.createElement("p");
new_message.innerHTML = event.data;
status_messages.appendChild(new_message);
};
sse.onerror = function(event) {
let new_message = document.createElement("p");
if (event.readyState == EventSource.CLOSED) {
new_message.innerHTML = "Connections closed";
} else {
new_message.innerHTML = "Error: " + event.type;
}
status_messages.appendChild(new_message);
};
</script>
</body>
</html>
我只需要 lengthy_operation()
在上传文件时调用一次。并将其生成的消息发送给所有客户端。现在它可以与本地功能一起使用,这不是我想要的。如何使用外部函数并将其消息传递给 stage()
方法?
I want notifications for these stages to be pushed to all the connected clients.
我怀疑您最终会想要更多的控制权,但我会按原样回答您的问题。稍后,您可能希望以下面的示例为基础,并根据用户的会话、特定的开始时间戳或其他一些相关概念来过滤广播的通知。
每个 "connected client" 都有效地挂在对 /stage
的长 运行 请求上,服务器将使用该请求在 EventSource
上流式传输 events to the client. In your example, each client will begin that request immediately and leave it open until the server terminates the stream. You can also close the stream from the client using close()
。
基本解决方案
您询问了如何让 /stage
处理程序将其事件广播或镜像到所有当前连接的客户端。有许多方法可以完成此操作,但简而言之,您希望 lengthy_operation
函数将 post 事件发送给所有 /stage
处理程序读取器,或者发送到持久共享位置,所有 /stage
处理程序读取。我将展示一种封装上述第一个想法的方法。
考虑序列化为 data: <some message>
:
class StreamEvent:
def __init__(self, message: str) -> bytes:
self.message = message
def serialize(self) -> str:
return f'data: {self.message}\n\n'.encode('utf-8')
以及文件相关流事件的更具体派生案例:
class FileStreamEvent(StreamEvent):
def __init__(self, message: str, name: str):
super().__init__(message)
self.name = name
def serialize(self) -> bytes:
return f'data: file: {self.name}: {self.message}\n\n'.encode('utf-8')
您可以创建一个非常原始的 publish/subscribe 类型的容器,然后 /stage
可以订阅侦听器并且 lengthy_operation()
可以向所有侦听器发布 StreamEvent
实例:
class StreamSource:
def __init__(self):
self.listeners: List[Queue] = []
def put(self, event: StreamEvent):
for listener in self.listeners:
listener.put_nowait(event)
def get(self):
listener = Queue()
self.listeners.append(listener)
try:
while True:
event = listener.get()
yield event.serialize()
finally:
self.listeners.remove(listener)
在 StreamSource.get()
中,您可能想要创建一个最终案例(例如检查 "close" 或 "finish" 事件)以退出通用 while True
并且您可能想在阻塞 Queue.get()
调用上设置超时。但是为了这个例子,我保留了一切基本内容。
现在,lengthy_operation()
只需要引用 StreamSource
:
def lengthy_operation(events: StreamSource, name: str, stream: BinaryIO):
for stage in range(10):
events.put(FileStreamEvent(f'stage {stage}: begin', name))
sleep(2)
events.put(FileStreamEvent(f'stage {stage}: end', name))
events.put(FileStreamEvent('finished', name))
SSETest
然后可以为每个 lengthy_operation()
调用提供一个 StreamSource
的共享实例,并且 SSETest.stage()
可以使用 StreamSource.get()
在这个共享上注册一个监听器实例:
class SSETest:
_stream_source: StreamSource = StreamSource()
@cherrypy.expose
def index(self):
return Path('SSETest.html').read_text()
@cherrypy.expose
def upload(self, file):
name = file.filename.encode('iso-8859-1').decode('utf-8')
lengthy_operation(self._stream_source, name, file.file)
return 'OK'
@cherrypy.expose
def stage(self):
cherrypy.response.headers['Cache-Control'] = 'no-cache'
cherrypy.response.headers['Content-Type'] = 'text/event-stream'
def stream():
yield from self._stream_source.get()
return stream()
stage._cp_config = {'response.stream': True}
这是一个完整的[1]示例,说明如何解决您眼前的问题,但您很可能希望在接近最终用户体验时对其进行调整记住了。
[1]:为了便于阅读,我省略了导入,所以它们是:
from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from time import sleep
from typing import BinaryIO, List
import cherrypy
后续退出条件
由于您正在使用 cherrypy.quickstart()
,在上面的最小可行解决方案中,您将不得不强制退出 SSETest
服务,因为我没有为您假设任何优雅的 "stop" 行为。第一个解决方案明确指出了这一点,但为了可读性没有提供解决方案。
让我们看看提供一些初始优雅 "stop" 条件的几种方法:
为StreamSource
添加停止条件
首先,至少要给StreamSource
加上一个合理的停止条件。例如,添加一个 running
属性,允许 StreamSource.get()
while
循环正常退出。接下来,设置合理的 Queue.get()
超时,以便循环可以在处理消息之间定期测试此 running
属性。接下来,确保至少有一些相关的 CherryPy 总线消息触发此停止行为。下面,我将所有这些行为都整合到 StreamSource
class 中,但您也可以注册一个单独的应用程序级 CherryPy 插件来处理对 StreamSource.stop()
的调用,而不是使 StreamSource
成为一个插入。当我添加一个单独的信号处理程序时,我将演示它的样子。
class StreamSource(plugins.SimplePlugin):
def __init__(self, bus: wspbus.Bus):
super().__init__(bus)
self.subscribe()
self.running = True
self.listeners: List[Queue] = []
def graceful(self):
self.stop()
def exit(self):
self.stop()
def stop(self):
self.running = False
def put(self, event: StreamEvent):
for listener in self.listeners:
listener.put_nowait(event)
def get(self):
listener = Queue()
self.listeners.append(listener)
try:
while self.running:
try:
event = listener.get(timeout=1.0)
yield event.serialize()
except Empty:
pass
finally:
self.listeners.remove(listener)
现在,SSETest
需要用总线值初始化 StreamSource
,因为 class 现在是 SimplePlugin
:
_stream_source: StreamSource = StreamSource(cherrypy.engine)
您会发现此解决方案使您更接近您可能想要的用户体验。发出键盘中断,CherryPy 将开始停止系统,但第一个优雅的键盘中断不会发布 stop
消息,因此您需要发送第二个键盘中断。
添加一个 SIGINT 处理程序来捕获键盘中断
由于 cherrypy.quickstart
使用信号处理程序的方式,您可能需要注册一个 SIGINT
handler as a CherryPy-compatible SignalHandler
插件以在第一次键盘中断时优雅地停止 StreamSource
。
这是一个例子:
class SignalHandler(plugins.SignalHandler):
def __init__(self, bus: wspbus.Bus, sse):
super().__init__(bus)
self.handlers = {
'SIGINT': self.handle_SIGINT,
}
self.sse = sse
def handle_SIGINT(self):
self.sse.stop()
raise KeyboardInterrupt()
请注意,在这种情况下,我将演示一个通用的应用程序级处理程序,然后您可以通过如下更改启动 cherrypy.quickstart()
逻辑来配置和初始化它:
sse = SSETest()
SignalHandler(cherrypy.engine, sse).subscribe()
cherrypy.quickstart(sse)
对于此示例,我公开了一个通用应用程序 SSETest.stop
方法来封装所需的行为:
class SSETest:
_stream_source: StreamSource = StreamSource(cherrypy.engine)
def stop(self):
self._stream_source.stop()
总结分析
我 不是 CherryPy 用户,我昨天才开始第一次看它只是为了回答你的问题,所以我会离开 "CherryPy best practices"你的自由裁量权。
实际上,您的问题是以下 Python 问题的非常普遍的组合:
- 如何实现简单的 publish/subscribe 模式? (回答
Queue
); - 如何为订阅者循环创建退出条件? (用
Queue.get()
的timeout
参数和running
属性回答) - 如何用键盘中断影响退出条件? (使用特定于 CherryPy 的信号处理程序进行了回答,但这只是建立在 Python 内置
signal
模块中的概念之上)
您可以通过多种方式解决所有这些问题,有些更倾向于通用 "Pythonic" 解决方案(我的偏好是有意义的),而其他人则利用以 CherryPy 为中心的概念(这在以下情况下有意义)你想增加 CherryPy 的行为而不是重写或破坏它)。
例如,您可以使用 CherryPy 总线消息来传送流消息,但对我来说,这会使您的应用程序逻辑与 CherryPy 特定的功能纠缠得太多,所以我可能会找到一个中间地带来处理您的从我的 StreamSource
示例如何使用标准 Python Queue
模式中可以看出,应用程序的一般特性(以免将自己束缚于 CherryPy)。您可以选择制作 StreamSource
一个插件,以便它可以直接响应某些 CherryPy 总线消息(如我上面所示),或者您可以有一个单独的插件,它知道调用相关的特定于应用程序的域,例如StreamSource.stop()
(类似于我用 SignalHandler
显示的内容)。
最后,你所有的问题都很好,但它们之前都可能作为通用 Python 问题在 SO 上得到了回答,所以当我将这里的答案与你的 CherryPy 问题联系起来时 space我还想帮助您(和未来的读者)了解如何在 CherryPy 之外更抽象地思考这些特定问题。