Python Tornado KeyError 从客户集中删除客户时
Python Tornado KeyError When removing client from clients set
我有一个 Python Tornado Websocket 服务器,它将客户端存储在共享集 () 中,以便我知道有多少客户端已连接。
挑战在于,在 WebSocketClosedError
之后调用 on_close
会引发 KeyError
并且客户端实例不会从连接的客户端集中删除。这个错误导致我的服务器累积了超过 1000 个客户端,即使活动客户端只有 5 个左右。
我的代码:
import tornado.iostream
import tornado.websocket
import asyncio
class SocketHandler(tornado.websocket.WebSocketHandler):
socket_active_message = {"status": "Socket Connection Active"}
waiters = set()
def initialize(self):
self.client_name = "newly_connected"
def open(self):
print('connection opened')
# https://kite.com/python/docs/tornado.websocket.WebSocketHandler.set_nodelay
self.set_nodelay(True)
SocketHandler.waiters.add(self)
def on_close(self):
print("CLOSED!", self.client_name)
SocketHandler.waiters.remove(self)
def check_origin(self, origin):
# Override the origin check if needed
return True
async def send_updates(self, message):
print('starting socket service loop')
loop_counter = 0
while True:
try:
await self.write_message({'status': 82317581})
except tornado.websocket.WebSocketClosedError:
self.on_close()
except tornado.iostream.StreamClosedError:
self.on_close()
except Exception as e:
self.on_close()
print('Exception e:', self.client_name)
await asyncio.sleep(0.05)
async def on_message(self, message):
print("RECEIVED :", message)
self.client_name = message
await self.send_updates(message)
def run_server():
# Create tornado application and supply URL routes
webApp = tornado.web.Application(
[
(
r"/",
SocketHandler,
{},
),
]
)
application = tornado.httpserver.HTTPServer(webApp)
webApp.listen(3433)
# Start IO/Event loop
tornado.ioloop.IOLoop.instance().start()
run_server()
堆栈跟踪:
Traceback (most recent call last):
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/web.py", line 1699, in _execute
result = await result
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 278, in get
await self.ws_connection.accept_connection(self)
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 881, in accept_connection
await self._accept_connection(handler)
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 964, in _accept_connection
await self._receive_frame_loop()
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1118, in _receive_frame_loop
await self._receive_frame()
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1209, in _receive_frame
await handled_future
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 658, in <lambda>
self.stream.io_loop.add_future(result, lambda f: f.result())
File "ask_So.py", line 50, in on_message
await self.send_updates(message)
File "ask_So.py", line 39, in send_updates
self.on_close()
File "ask_So.py", line 26, in on_close
SocketHandler.waiters.remove(self)
KeyError: <__main__.SocketHandler object at 0x7ffef9f25520>
我试过将服务员设置移到 class 之外,但它仍然产生相同的行为。
模拟WebSocketClosedError
:作为客户端打开多个浏览器选项卡,一次关闭一个浏览器选项卡。
似乎 self.on_close()
被调用了两次。一旦您从 send_updates()
内部手动调用它,稍后当连接实际关闭时,Tornado 也会调用 self.on_close()
。由于 self
对象已经第一次从集合中移除,它第二次引发 KeyError
。
如果要关闭连接,只需调用self.close()
。 self.on_close()
方法将由 Tornado 自动调用。
此外,您可以在 on_close
内的 try...except
块中处理异常。
更新
这个答案的前一部分应该解决 KeyError
相关的问题。此更新是关于为什么没有从 waiters
集合中删除客户端。
所以,我测试了你的代码并发现了一个主要问题:
async def on_message(self, message):
print("RECEIVED :", message)
self.client_name = message
await self.send_updates(message) # <- This is problematic
每当客户端发送消息时,它都会运行 self.send_updates
方法。所以即使只有一个客户端发送一条消息,比方说,10 次,send_updates
也会被调用 10 次,结果,你将有 10 个 while
循环运行同时宁!
随着循环次数的增加,最终会阻塞服务器。这意味着 Tornado 没有时间处理 运行 其他代码,因为它正忙于处理如此多的 while
循环。因此,来自 waiters
的客户端永远不会被删除。
解决方案
您可以只调用一次,而不是每次收到消息时都调用 send_updates
。只需一个 while
循环即可向所有客户端发送更新。
我会像这样更新代码:
class SocketHandler(...):
# Make it a classmethod so that it can be
# called without an instance
@classmethod
async def send_updates(cls):
print('starting socket service loop')
loop_counter = 0
while True:
for waiter in cls.waiters:
# use `waiter` instead of `self`
try:
await waiter.write_message({'status': 82317581})
...
await asyncio.sleep(0.05)
而不是从 on_message
调用 send_updates
,你必须调用 IOLoop 来调用它一次:
def run_server():
...
# schedule SocketHandler.send_updates to be run
tornado.ioloop.IOLoop.current().add_callback(SocketHandler.send_updates)
tornado.ioloop.IOLoop.current().start()
对于所有客户端,这将只有一个 while
循环 运行ning。
我有一个 Python Tornado Websocket 服务器,它将客户端存储在共享集 () 中,以便我知道有多少客户端已连接。
挑战在于,在 WebSocketClosedError
之后调用 on_close
会引发 KeyError
并且客户端实例不会从连接的客户端集中删除。这个错误导致我的服务器累积了超过 1000 个客户端,即使活动客户端只有 5 个左右。
我的代码:
import tornado.iostream
import tornado.websocket
import asyncio
class SocketHandler(tornado.websocket.WebSocketHandler):
socket_active_message = {"status": "Socket Connection Active"}
waiters = set()
def initialize(self):
self.client_name = "newly_connected"
def open(self):
print('connection opened')
# https://kite.com/python/docs/tornado.websocket.WebSocketHandler.set_nodelay
self.set_nodelay(True)
SocketHandler.waiters.add(self)
def on_close(self):
print("CLOSED!", self.client_name)
SocketHandler.waiters.remove(self)
def check_origin(self, origin):
# Override the origin check if needed
return True
async def send_updates(self, message):
print('starting socket service loop')
loop_counter = 0
while True:
try:
await self.write_message({'status': 82317581})
except tornado.websocket.WebSocketClosedError:
self.on_close()
except tornado.iostream.StreamClosedError:
self.on_close()
except Exception as e:
self.on_close()
print('Exception e:', self.client_name)
await asyncio.sleep(0.05)
async def on_message(self, message):
print("RECEIVED :", message)
self.client_name = message
await self.send_updates(message)
def run_server():
# Create tornado application and supply URL routes
webApp = tornado.web.Application(
[
(
r"/",
SocketHandler,
{},
),
]
)
application = tornado.httpserver.HTTPServer(webApp)
webApp.listen(3433)
# Start IO/Event loop
tornado.ioloop.IOLoop.instance().start()
run_server()
堆栈跟踪:
Traceback (most recent call last):
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/web.py", line 1699, in _execute
result = await result
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 278, in get
await self.ws_connection.accept_connection(self)
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 881, in accept_connection
await self._accept_connection(handler)
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 964, in _accept_connection
await self._receive_frame_loop()
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1118, in _receive_frame_loop
await self._receive_frame()
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1209, in _receive_frame
await handled_future
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 658, in <lambda>
self.stream.io_loop.add_future(result, lambda f: f.result())
File "ask_So.py", line 50, in on_message
await self.send_updates(message)
File "ask_So.py", line 39, in send_updates
self.on_close()
File "ask_So.py", line 26, in on_close
SocketHandler.waiters.remove(self)
KeyError: <__main__.SocketHandler object at 0x7ffef9f25520>
我试过将服务员设置移到 class 之外,但它仍然产生相同的行为。
模拟WebSocketClosedError
:作为客户端打开多个浏览器选项卡,一次关闭一个浏览器选项卡。
似乎 self.on_close()
被调用了两次。一旦您从 send_updates()
内部手动调用它,稍后当连接实际关闭时,Tornado 也会调用 self.on_close()
。由于 self
对象已经第一次从集合中移除,它第二次引发 KeyError
。
如果要关闭连接,只需调用self.close()
。 self.on_close()
方法将由 Tornado 自动调用。
此外,您可以在 on_close
内的 try...except
块中处理异常。
更新
这个答案的前一部分应该解决 KeyError
相关的问题。此更新是关于为什么没有从 waiters
集合中删除客户端。
所以,我测试了你的代码并发现了一个主要问题:
async def on_message(self, message):
print("RECEIVED :", message)
self.client_name = message
await self.send_updates(message) # <- This is problematic
每当客户端发送消息时,它都会运行 self.send_updates
方法。所以即使只有一个客户端发送一条消息,比方说,10 次,send_updates
也会被调用 10 次,结果,你将有 10 个 while
循环运行同时宁!
随着循环次数的增加,最终会阻塞服务器。这意味着 Tornado 没有时间处理 运行 其他代码,因为它正忙于处理如此多的 while
循环。因此,来自 waiters
的客户端永远不会被删除。
解决方案
您可以只调用一次,而不是每次收到消息时都调用 send_updates
。只需一个 while
循环即可向所有客户端发送更新。
我会像这样更新代码:
class SocketHandler(...):
# Make it a classmethod so that it can be
# called without an instance
@classmethod
async def send_updates(cls):
print('starting socket service loop')
loop_counter = 0
while True:
for waiter in cls.waiters:
# use `waiter` instead of `self`
try:
await waiter.write_message({'status': 82317581})
...
await asyncio.sleep(0.05)
而不是从 on_message
调用 send_updates
,你必须调用 IOLoop 来调用它一次:
def run_server():
...
# schedule SocketHandler.send_updates to be run
tornado.ioloop.IOLoop.current().add_callback(SocketHandler.send_updates)
tornado.ioloop.IOLoop.current().start()
对于所有客户端,这将只有一个 while
循环 运行ning。