Python Tornado 从另一个线程发送 WebSocket 消息
Python Tornado send WebSocket messages from another thread
我想在 Python 中使用 WebSockets 来使 Web 客户端了解我使用 PySerial 从串行端口读取的数据的最新信息。我目前正在使用以下代码通过单独的线程连续读取串行数据
def read_from_port():
while running:
reading = ser.readline().decode()
handle_data(reading)
thread = threading.Thread(target=read_from_port)
thread.daemon = True
thread.start()
我正在对串行数据执行一些处理,然后如果计算结果与其先前的值不同,我想向所有连接的 WebSocket 客户端广播一条消息。为此,我设置了以下代码
clients = []
def Broadcast(message):
for client in clients:
client.sendMessage(json.dumps(message).encode('utf8'))
print("broadcasted")
worker.broadcast = Broadcast
class WSHandler(tornado.websocket.WebSocketHandler):
def open(self):
print('new connection')
clients.append(self)
def on_message(self, message):
print('message received: %s' % message)
response = handler.HandleRequest(message, self.write_message)
def on_close(self):
print('connection closed')
clients.remove(self)
def check_origin(self, origin):
return True
application = tornado.web.Application([
(r'/ws', WSHandler),
])
if __name__ == "__main__":
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8765)
myIP = socket.gethostbyname(socket.gethostname())
print('*** Websocket Server Started at %s***' % myIP)
tornado.ioloop.IOLoop.instance().start()
然后我想在worker中使用“broadcast”方法广播出一个结果。从工作线程使用此方法会产生以下错误
File "main.py", line 18, in Broadcast
client.write_message(message)
File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 342, in write_message
return self.ws_connection.write_message(message, binary=binary)
File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1098, in write_message
fut = self._write_frame(True, opcode, message, flags=flags)
File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1075, in _write_frame
return self.stream.write(frame)
File "/usr/local/lib/python3.8/site-packages/tornado/iostream.py", line 555, in write
future = Future() # type: Future[None]
File "/usr/local/Cellar/python@3.8/3.8.3_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.
我知道问题是 Tornado write_message 函数不是线程安全的,并且产生此错误是因为我试图直接从工作线程调用该函数。据我所知,在 Tornado 中使用并发代码的推荐方法是通过 asyncio,但我认为在这种情况下线程方法可能更合适,因为我有一个基本上不断并行运行的循环。
不幸的是,我对 asyncio 以及如何在 Python 中实现线程知之甚少,所以我想找出从不同线程发送 WebSocket 消息的最简单方法是什么。
在 https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading 阅读关于同时使用 asyncio 和多线程的官方文档给了我必要的线索,我可以使用“call_soon_threadsafe”函数非常优雅地实现这一点。因此,以下代码似乎可以解决问题
tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)
clients = []
def bcint(message):
for client in clients:
client.write_message(message)
print("broadcasted")
def Broadcast(message):
io_loop.asyncio_loop.call_soon_threadsafe(bcint, message)
worker.broadcast = Broadcast
class WSHandler(tornado.websocket.WebSocketHandler):
def open(self):
print('new connection')
clients.append(self)
def on_message(self, message):
print('message received: %s' % message)
response = handler.HandleRequest(message, self.write_message)
def on_close(self):
print('connection closed')
clients.remove(self)
def check_origin(self, origin):
return True
application = tornado.web.Application([
(r'/ws', WSHandler),
])
if __name__ == "__main__":
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8765)
myIP = socket.gethostbyname(socket.gethostname())
print('*** Websocket Server Started at %s***' % myIP)
tornado.ioloop.IOLoop.current().start()
一个更简洁的选择是使用 pyzmq 之类的队列,这将帮助您建立从一个线程到另一个线程的通信。
查看您的用例,您可以使用 PUB/SUB 模型。这里有一个sample code。此外,您可以使用 'inproc' 而不是 'tcp'。这将减少延迟,因为您将在同一进程中的多个线程之间进行通信。
我想在 Python 中使用 WebSockets 来使 Web 客户端了解我使用 PySerial 从串行端口读取的数据的最新信息。我目前正在使用以下代码通过单独的线程连续读取串行数据
def read_from_port():
while running:
reading = ser.readline().decode()
handle_data(reading)
thread = threading.Thread(target=read_from_port)
thread.daemon = True
thread.start()
我正在对串行数据执行一些处理,然后如果计算结果与其先前的值不同,我想向所有连接的 WebSocket 客户端广播一条消息。为此,我设置了以下代码
clients = []
def Broadcast(message):
for client in clients:
client.sendMessage(json.dumps(message).encode('utf8'))
print("broadcasted")
worker.broadcast = Broadcast
class WSHandler(tornado.websocket.WebSocketHandler):
def open(self):
print('new connection')
clients.append(self)
def on_message(self, message):
print('message received: %s' % message)
response = handler.HandleRequest(message, self.write_message)
def on_close(self):
print('connection closed')
clients.remove(self)
def check_origin(self, origin):
return True
application = tornado.web.Application([
(r'/ws', WSHandler),
])
if __name__ == "__main__":
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8765)
myIP = socket.gethostbyname(socket.gethostname())
print('*** Websocket Server Started at %s***' % myIP)
tornado.ioloop.IOLoop.instance().start()
然后我想在worker中使用“broadcast”方法广播出一个结果。从工作线程使用此方法会产生以下错误
File "main.py", line 18, in Broadcast
client.write_message(message)
File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 342, in write_message
return self.ws_connection.write_message(message, binary=binary)
File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1098, in write_message
fut = self._write_frame(True, opcode, message, flags=flags)
File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1075, in _write_frame
return self.stream.write(frame)
File "/usr/local/lib/python3.8/site-packages/tornado/iostream.py", line 555, in write
future = Future() # type: Future[None]
File "/usr/local/Cellar/python@3.8/3.8.3_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.
我知道问题是 Tornado write_message 函数不是线程安全的,并且产生此错误是因为我试图直接从工作线程调用该函数。据我所知,在 Tornado 中使用并发代码的推荐方法是通过 asyncio,但我认为在这种情况下线程方法可能更合适,因为我有一个基本上不断并行运行的循环。
不幸的是,我对 asyncio 以及如何在 Python 中实现线程知之甚少,所以我想找出从不同线程发送 WebSocket 消息的最简单方法是什么。
在 https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading 阅读关于同时使用 asyncio 和多线程的官方文档给了我必要的线索,我可以使用“call_soon_threadsafe”函数非常优雅地实现这一点。因此,以下代码似乎可以解决问题
tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)
clients = []
def bcint(message):
for client in clients:
client.write_message(message)
print("broadcasted")
def Broadcast(message):
io_loop.asyncio_loop.call_soon_threadsafe(bcint, message)
worker.broadcast = Broadcast
class WSHandler(tornado.websocket.WebSocketHandler):
def open(self):
print('new connection')
clients.append(self)
def on_message(self, message):
print('message received: %s' % message)
response = handler.HandleRequest(message, self.write_message)
def on_close(self):
print('connection closed')
clients.remove(self)
def check_origin(self, origin):
return True
application = tornado.web.Application([
(r'/ws', WSHandler),
])
if __name__ == "__main__":
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8765)
myIP = socket.gethostbyname(socket.gethostname())
print('*** Websocket Server Started at %s***' % myIP)
tornado.ioloop.IOLoop.current().start()
一个更简洁的选择是使用 pyzmq 之类的队列,这将帮助您建立从一个线程到另一个线程的通信。
查看您的用例,您可以使用 PUB/SUB 模型。这里有一个sample code。此外,您可以使用 'inproc' 而不是 'tcp'。这将减少延迟,因为您将在同一进程中的多个线程之间进行通信。