服务器发送的事件未被所有网络客户端接收

Server-sent-events not received by all web-clients

我有一个生成服务器发送事件 (sse) 的 Flask 网络服务器,所有连接的网络客户端都应该接收到这些事件。

在下面的“版本 1”中有效。所有网络客户端都会收到事件,并进行相应更新。

在下面的“版本 2”中,这是对版本 1 的重构,这不再按预期工作:

相反,我得到:

据我所知,服务器始终在生成事件,并且通常至少有一个客户端正在接收。

我的初始测试在 Raspberry Pi 3 上托管网络服务器,在 Pi 上使用网络客户端,在 Windows 和 OSX 上使用各种浏览器。

为了消除任何可能的网络问题,我对网络服务器和 3 个 Chrome 实例重复了相同的测试,这些实例都托管在同一台 OSX 笔记本电脑上。 这给出了相同的结果:版本 1 "OK"、版本 2 "NOT OK".

成功接收的客户端似乎因事件而异:到目前为止我无法辨别模式。

版本 1 和版本 2 的结构 change_objects 包含 "things that should be tracked for changes"

对 "things" 的更改是根据代码中其他地方收到的网络服务触发的。

版本 1(确定:所有 Web 客户端都收到 sse 事件)

def check_walk(walk_new, walk_old):
    if walk_new != walk_old:
        print("walk change", walk_old, walk_new)
        return True, walk_new
    else:
        return False, walk_old

def walk_event(walk):
    silliness = walk['silliness']
    data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
    return "data: {}\n\n".format(data)

change_objects = {
    "walk1": {
        "object": walks[0],
        "checker": check_walk,
        "event": walk_event,
    },
    ... more things to be tracked...
}

def event_stream(change_objects):
    copies = {}
    for key, value in change_objects.items():
        copies[key] = {"obj_old": deepcopy(value["object"])}  # ensure a true copy, not a reference!

    while True:
        gevent.sleep(0.5)
        for key, value in change_objects.items():
            obj_new = deepcopy(value["object"]) # use same version in check and yield functions
            obj_changed, copies[key]["obj_old"] = value["checker"](obj_new, copies[key]["obj_old"])
            if (obj_changed):
                yield value["event"](obj_new)

@app.route('/server_events')
def sse_request():
    return Response(
            event_stream(change_objects),
            mimetype='text/event-stream')

版本 2(不正常:sse 事件并不总是被所有网络客户端接收)

class Reporter:

    def __init__(self, reportee, name):
        self._setup(reportee, name)

    def _setup(self, reportee, name):
        self.old = self.truecopy(reportee)
        self.new = reportee
        self.name = "{}_change".format(name)

    def truecopy(self, orig):
        return deepcopy(orig)

    def changed(self):
        if self.new != self.old:
            self.old = self.truecopy(self.new)
            return True
        else:
            return False

    def sse_event(self):
        data = self.new.copy()
        data['type'] = self.name
        data = json.dumps(data)
        return "data: {}\n\n".format(data)

class WalkReporter(Reporter):

    # as we are only interested in changes to attribute "silliness" (not other attributes) --> override superclass sse_event
    def sse_event(self): 
        silliness = self.new['silliness']
        data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
        return "data: {}\n\n".format(data)

change_objects = [
    WalkReporter(name="walk1", reportee=walks[0]),
    ... more objects to be tracked...
] 

def event_stream(change_objects):
    while True:
        gevent.sleep(0.5)
        for obj in change_objects:
            if obj.changed():
                yield obj.sse_event()

@app.route('/server_events')
def sse_request():
    return Response(
            event_stream(change_objects),
            mimetype='text/event-stream')

完全披露:这个问题是问题的后续: 它专注于在跟踪多个 "things" 的变化时重构 event_stream() 函数。 但是这里的问题显然超出了原始问题的范围,因此是一个新问题。

问题中重构的 "Version 2" 代码存在并发/时序问题。

sse_request() 为每个 Web 客户端调用(在测试用例 3 实例中)。因此,我们有 3 个实例在 event_stream().

中循环

这些调用"more or less"并行发生:这实际上意味着随机顺序。

但是列表 change_objects 是共享的,因此第一个发现更改的网络客户端会将共享 WalkReporter 实例中的 "old" 副本更新为最新状态,并且可以在其他客户端发现更改之前这样做。即第一个成功的网络客户端有效地隐藏了其他网络客户端的变化。

这很容易解决,方法是为每个 Web 客户端提供自己的 change_objects 副本。

change_objects 移动到 sse_request() 中,如下所示。

@app.route('/server_events')
def sse_request():
    change_objects = [
        WalkReporter(name="walk1", reportee=walks[0]),
        ... more objects to be tracked...
    ]
    return Response(
            event_stream(change_objects),
            mimetype='text/event-stream')

有了这个微小的变化,sse_request() 的每个实例都可以发现变化,因此所有网络客户端都会按预期接收 sse 事件。