Python Tornado KeyError 从客户集中删除客户时

Python Tornado KeyError When removing client from clients set

我有一个 Python Tornado Websocket 服务器,它将客户端存储在共享集 () 中,以便我知道有多少客户端已连接。

挑战在于,在 WebSocketClosedError 之后调用 on_close 会引发 KeyError 并且客户端实例不会从连接的客户端集中删除。这个错误导致我的服务器累积了超过 1000 个客户端,即使活动客户端只有 5 个左右。

我的代码:

import tornado.iostream
import tornado.websocket
import asyncio


class SocketHandler(tornado.websocket.WebSocketHandler):
    socket_active_message = {"status": "Socket Connection Active"}
    waiters = set()

    def initialize(self):
        self.client_name = "newly_connected"

    def open(self):
        print('connection opened')
        # https://kite.com/python/docs/tornado.websocket.WebSocketHandler.set_nodelay
        self.set_nodelay(True)
        SocketHandler.waiters.add(self)

    def on_close(self):
        print("CLOSED!", self.client_name)
        SocketHandler.waiters.remove(self)

    def check_origin(self, origin):
        # Override the origin check if needed
        return True

    async def send_updates(self, message):
        print('starting socket service loop')
        loop_counter = 0
        while True:
            try:
                await self.write_message({'status': 82317581})
            except tornado.websocket.WebSocketClosedError:
                self.on_close()
            except tornado.iostream.StreamClosedError:
                self.on_close()
            except Exception as e:
                self.on_close()
                print('Exception e:', self.client_name)
            await asyncio.sleep(0.05)

    async def on_message(self, message):
        print("RECEIVED :", message)
        self.client_name = message
        await self.send_updates(message)


def run_server():
    # Create tornado application and supply URL routes
    webApp = tornado.web.Application(
        [
            (
                r"/",
                SocketHandler,
                {},
            ),
        ]
    )

    application = tornado.httpserver.HTTPServer(webApp)
    webApp.listen(3433)
    # Start IO/Event loop
    tornado.ioloop.IOLoop.instance().start()


run_server()

堆栈跟踪:

Traceback (most recent call last):
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/web.py", line 1699, in _execute
    result = await result
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 278, in get
    await self.ws_connection.accept_connection(self)
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 881, in accept_connection
    await self._accept_connection(handler)
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 964, in _accept_connection
    await self._receive_frame_loop()
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1118, in _receive_frame_loop
    await self._receive_frame()
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1209, in _receive_frame
    await handled_future
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 658, in <lambda>
    self.stream.io_loop.add_future(result, lambda f: f.result())
  File "ask_So.py", line 50, in on_message
    await self.send_updates(message)
  File "ask_So.py", line 39, in send_updates
    self.on_close()
  File "ask_So.py", line 26, in on_close
    SocketHandler.waiters.remove(self)
KeyError: <__main__.SocketHandler object at 0x7ffef9f25520>

我试过将服务员设置移到 class 之外,但它仍然产生相同的行为。

模拟WebSocketClosedError:作为客户端打开多个浏览器选项卡,一次关闭一个浏览器选项卡。

似乎 self.on_close() 被调用了两次。一旦您从 send_updates() 内部手动调用它,稍后当连接实际关闭时,Tornado 也会调用 self.on_close()。由于 self 对象已经第一次从集合中移除,它第二次引发 KeyError

如果要关闭连接,只需调用self.close()self.on_close() 方法将由 Tornado 自动调用。

此外,您可以在 on_close 内的 try...except 块中处理异常。


更新

这个答案的前一部分应该解决 KeyError 相关的问题。此更新是关于为什么没有从 waiters 集合中删除客户端。

所以,我测试了你的代码并发现了一个主要问题:

 async def on_message(self, message):
    print("RECEIVED :", message)
    self.client_name = message
    await self.send_updates(message) # <- This is problematic

每当客户端发送消息时,它都会运行 self.send_updates 方法。所以即使只有一个客户端发送一条消息,比方说,10 次,send_updates 也会被调用 10 次,结果,你将有 10 个 while 循环运行同时宁!

随着循环次数的增加,最终会阻塞服务器。这意味着 Tornado 没有时间处理 运行 其他代码,因为它正忙于处理如此多的 while 循环。因此,来自 waiters 的客户端永远不会被删除。

解决方案

您可以只调用一次,而不是每次收到消息时都调用 send_updates。只需一个 while 循环即可向所有客户端发送更新。

我会像这样更新代码:

class SocketHandler(...):
    # Make it a classmethod so that it can be 
    # called without an instance
    @classmethod
    async def send_updates(cls):
        print('starting socket service loop')
        loop_counter = 0
        while True:
            for waiter in cls.waiters:
                # use `waiter` instead of `self`
                try:
                    await waiter.write_message({'status': 82317581})
                    ...

            await asyncio.sleep(0.05)

而不是从 on_message 调用 send_updates,你必须调用 IOLoop 来调用它一次:

def run_server():
    ... 

    # schedule SocketHandler.send_updates to be run
    tornado.ioloop.IOLoop.current().add_callback(SocketHandler.send_updates)
    tornado.ioloop.IOLoop.current().start()

对于所有客户端,这将只有一个 while 循环 运行ning。