服务器发送的事件未被所有网络客户端接收
Server-sent-events not received by all web-clients
我有一个生成服务器发送事件 (sse) 的 Flask 网络服务器,所有连接的网络客户端都应该接收到这些事件。
在下面的“版本 1”中有效。所有网络客户端都会收到事件,并进行相应更新。
在下面的“版本 2”中,这是对版本 1 的重构,这不再按预期工作:
相反,我得到:
- 大多数情况下只有一个 Web 客户端获取事件,或者
- 很少
多个网络客户端获取事件,或者
- 很少 none
网络客户端获取事件
据我所知,服务器始终在生成事件,并且通常至少有一个客户端正在接收。
我的初始测试在 Raspberry Pi 3 上托管网络服务器,在 Pi 上使用网络客户端,在 Windows 和 OSX 上使用各种浏览器。
为了消除任何可能的网络问题,我对网络服务器和 3 个 Chrome 实例重复了相同的测试,这些实例都托管在同一台 OSX 笔记本电脑上。
这给出了相同的结果:版本 1 "OK"、版本 2 "NOT OK".
成功接收的客户端似乎因事件而异:到目前为止我无法辨别模式。
版本 1 和版本 2 的结构 change_objects
包含 "things that should be tracked for changes"
在版本 1 中 change_objects
是一个字典的字典。
在第 2 版中,我将 change_objects
重构为 class Reporter
的实例列表,或者 class 的子 [=] 15=].
对 "things" 的更改是根据代码中其他地方收到的网络服务触发的。
版本 1(确定:所有 Web 客户端都收到 sse 事件)
def check_walk(walk_new, walk_old):
if walk_new != walk_old:
print("walk change", walk_old, walk_new)
return True, walk_new
else:
return False, walk_old
def walk_event(walk):
silliness = walk['silliness']
data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
return "data: {}\n\n".format(data)
change_objects = {
"walk1": {
"object": walks[0],
"checker": check_walk,
"event": walk_event,
},
... more things to be tracked...
}
def event_stream(change_objects):
copies = {}
for key, value in change_objects.items():
copies[key] = {"obj_old": deepcopy(value["object"])} # ensure a true copy, not a reference!
while True:
gevent.sleep(0.5)
for key, value in change_objects.items():
obj_new = deepcopy(value["object"]) # use same version in check and yield functions
obj_changed, copies[key]["obj_old"] = value["checker"](obj_new, copies[key]["obj_old"])
if (obj_changed):
yield value["event"](obj_new)
@app.route('/server_events')
def sse_request():
return Response(
event_stream(change_objects),
mimetype='text/event-stream')
版本 2(不正常:sse 事件并不总是被所有网络客户端接收)
class Reporter:
def __init__(self, reportee, name):
self._setup(reportee, name)
def _setup(self, reportee, name):
self.old = self.truecopy(reportee)
self.new = reportee
self.name = "{}_change".format(name)
def truecopy(self, orig):
return deepcopy(orig)
def changed(self):
if self.new != self.old:
self.old = self.truecopy(self.new)
return True
else:
return False
def sse_event(self):
data = self.new.copy()
data['type'] = self.name
data = json.dumps(data)
return "data: {}\n\n".format(data)
class WalkReporter(Reporter):
# as we are only interested in changes to attribute "silliness" (not other attributes) --> override superclass sse_event
def sse_event(self):
silliness = self.new['silliness']
data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
return "data: {}\n\n".format(data)
change_objects = [
WalkReporter(name="walk1", reportee=walks[0]),
... more objects to be tracked...
]
def event_stream(change_objects):
while True:
gevent.sleep(0.5)
for obj in change_objects:
if obj.changed():
yield obj.sse_event()
@app.route('/server_events')
def sse_request():
return Response(
event_stream(change_objects),
mimetype='text/event-stream')
完全披露:这个问题是问题的后续:
它专注于在跟踪多个 "things" 的变化时重构 event_stream()
函数。
但是这里的问题显然超出了原始问题的范围,因此是一个新问题。
问题中重构的 "Version 2" 代码存在并发/时序问题。
sse_request()
为每个 Web 客户端调用(在测试用例 3 实例中)。因此,我们有 3 个实例在 event_stream()
.
中循环
这些调用"more or less"并行发生:这实际上意味着随机顺序。
但是列表 change_objects
是共享的,因此第一个发现更改的网络客户端会将共享 WalkReporter
实例中的 "old" 副本更新为最新状态,并且可以在其他客户端发现更改之前这样做。即第一个成功的网络客户端有效地隐藏了其他网络客户端的变化。
这很容易解决,方法是为每个 Web 客户端提供自己的 change_objects
副本。
即change_objects
移动到 sse_request()
中,如下所示。
@app.route('/server_events')
def sse_request():
change_objects = [
WalkReporter(name="walk1", reportee=walks[0]),
... more objects to be tracked...
]
return Response(
event_stream(change_objects),
mimetype='text/event-stream')
有了这个微小的变化,sse_request()
的每个实例都可以发现变化,因此所有网络客户端都会按预期接收 sse 事件。
我有一个生成服务器发送事件 (sse) 的 Flask 网络服务器,所有连接的网络客户端都应该接收到这些事件。
在下面的“版本 1”中有效。所有网络客户端都会收到事件,并进行相应更新。
在下面的“版本 2”中,这是对版本 1 的重构,这不再按预期工作:
相反,我得到:
- 大多数情况下只有一个 Web 客户端获取事件,或者
- 很少 多个网络客户端获取事件,或者
- 很少 none 网络客户端获取事件
据我所知,服务器始终在生成事件,并且通常至少有一个客户端正在接收。
我的初始测试在 Raspberry Pi 3 上托管网络服务器,在 Pi 上使用网络客户端,在 Windows 和 OSX 上使用各种浏览器。
为了消除任何可能的网络问题,我对网络服务器和 3 个 Chrome 实例重复了相同的测试,这些实例都托管在同一台 OSX 笔记本电脑上。 这给出了相同的结果:版本 1 "OK"、版本 2 "NOT OK".
成功接收的客户端似乎因事件而异:到目前为止我无法辨别模式。
版本 1 和版本 2 的结构 change_objects
包含 "things that should be tracked for changes"
在版本 1 中
change_objects
是一个字典的字典。在第 2 版中,我将
change_objects
重构为 classReporter
的实例列表,或者 class 的子 [=] 15=].
对 "things" 的更改是根据代码中其他地方收到的网络服务触发的。
版本 1(确定:所有 Web 客户端都收到 sse 事件)
def check_walk(walk_new, walk_old):
if walk_new != walk_old:
print("walk change", walk_old, walk_new)
return True, walk_new
else:
return False, walk_old
def walk_event(walk):
silliness = walk['silliness']
data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
return "data: {}\n\n".format(data)
change_objects = {
"walk1": {
"object": walks[0],
"checker": check_walk,
"event": walk_event,
},
... more things to be tracked...
}
def event_stream(change_objects):
copies = {}
for key, value in change_objects.items():
copies[key] = {"obj_old": deepcopy(value["object"])} # ensure a true copy, not a reference!
while True:
gevent.sleep(0.5)
for key, value in change_objects.items():
obj_new = deepcopy(value["object"]) # use same version in check and yield functions
obj_changed, copies[key]["obj_old"] = value["checker"](obj_new, copies[key]["obj_old"])
if (obj_changed):
yield value["event"](obj_new)
@app.route('/server_events')
def sse_request():
return Response(
event_stream(change_objects),
mimetype='text/event-stream')
版本 2(不正常:sse 事件并不总是被所有网络客户端接收)
class Reporter:
def __init__(self, reportee, name):
self._setup(reportee, name)
def _setup(self, reportee, name):
self.old = self.truecopy(reportee)
self.new = reportee
self.name = "{}_change".format(name)
def truecopy(self, orig):
return deepcopy(orig)
def changed(self):
if self.new != self.old:
self.old = self.truecopy(self.new)
return True
else:
return False
def sse_event(self):
data = self.new.copy()
data['type'] = self.name
data = json.dumps(data)
return "data: {}\n\n".format(data)
class WalkReporter(Reporter):
# as we are only interested in changes to attribute "silliness" (not other attributes) --> override superclass sse_event
def sse_event(self):
silliness = self.new['silliness']
data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
return "data: {}\n\n".format(data)
change_objects = [
WalkReporter(name="walk1", reportee=walks[0]),
... more objects to be tracked...
]
def event_stream(change_objects):
while True:
gevent.sleep(0.5)
for obj in change_objects:
if obj.changed():
yield obj.sse_event()
@app.route('/server_events')
def sse_request():
return Response(
event_stream(change_objects),
mimetype='text/event-stream')
完全披露:这个问题是问题的后续:event_stream()
函数。
但是这里的问题显然超出了原始问题的范围,因此是一个新问题。
问题中重构的 "Version 2" 代码存在并发/时序问题。
sse_request()
为每个 Web 客户端调用(在测试用例 3 实例中)。因此,我们有 3 个实例在 event_stream()
.
这些调用"more or less"并行发生:这实际上意味着随机顺序。
但是列表 change_objects
是共享的,因此第一个发现更改的网络客户端会将共享 WalkReporter
实例中的 "old" 副本更新为最新状态,并且可以在其他客户端发现更改之前这样做。即第一个成功的网络客户端有效地隐藏了其他网络客户端的变化。
这很容易解决,方法是为每个 Web 客户端提供自己的 change_objects
副本。
即change_objects
移动到 sse_request()
中,如下所示。
@app.route('/server_events')
def sse_request():
change_objects = [
WalkReporter(name="walk1", reportee=walks[0]),
... more objects to be tracked...
]
return Response(
event_stream(change_objects),
mimetype='text/event-stream')
有了这个微小的变化,sse_request()
的每个实例都可以发现变化,因此所有网络客户端都会按预期接收 sse 事件。