Tornado 异步协程
Tornado asynchronous coroutine
好久没用tornado了。我想要一个 websocket,它从运行龙卷风的主机的串行设备获取更新。所以我尝试使用 tornado 进行多处理,但该进程无法访问 tornado websocket。我试图将它合并为协程,但似乎并没有产生。
class WebApplication(tornado.web.Application):
def __init__(self):
handlers = [
(r'/', IndexPageHandler),
(r"/config", ConfigHandler),
(r"/shutdown", ShutdownHandler),
(r'/websocket', WebSocketHandler),
(r'/(.*)', tornado.web.StaticFileHandler, {'path': resourcesWeb})
]
settings = {
'debug': debug,
'static_path': resourcesWeb,
'template_path': 'templates'
}
tornado.web.Application.__init__(self, handlers, **settings)
@gen.coroutine
def serial_reader(self):
log('serial_reader: start')
done = False
while not done:
sh.read()
serial_data_from = str(sh.data)
if len(serial_data_from) > 0:
if debug:
log('serial read:' + serial_data_from)
yield [con.write_message(serial_data_from) for con in WebSocketHandler.connections]
yield gen.sleep(0.3)
log('serial_reader: exit')
Python 3.8.5,龙卷风 6.1
我如何使用来自 Tornado 应用程序外部的数据正确且不断地更新 websocket
由于 sh.read
正在阻塞,您需要在执行程序中 运行 它。然后要在主线程中通知客户端,您需要使用 IOLoop.add_callback
(可以安全地从任何线程调用)。这也意味着 reader 方法成为常规同步方法。
示例:
from concurrent.futures import ThreadPoolExecutor
import functools
from tornado import web, websocket, ioloop
log = print
class IndexHandler(web.RequestHandler):
def get(self):
self.write("""<html>
<textarea cols="30" rows="10" id="output">%s</textarea><br />
<a href="/start" target="f" onclick="log(this.innerHTML)">start</a><br />
<a href="/stop" target="f" onclick="log(this.innerHTML)">stop</a><br />
<iframe name="f" width="100" height="30"></iframe>
<script>
ws = new WebSocket("ws://localhost:8888/stream");
out_el = document.getElementById("output");
function log(data) {out_el.value = data + "\n" + out_el.value;}
ws.onmessage = function (ev) {log(ev.data);}
</script>""" % "\n".join(map(str, reversed(self.application.read_data))))
class StartHandler(web.RequestHandler):
def get(self):
self.application.start_reader()
self.write("Started")
class StopHandler(web.RequestHandler):
def get(self):
self.application.stop_reader()
self.write("Stopped")
class WebSocketHandler(websocket.WebSocketHandler):
connections = set()
def open(self):
WebSocketHandler.connections.add(self)
def on_close(self):
if self in WebSocketHandler.connections:
WebSocketHandler.connections.remove(self)
class WebApplication(web.Application):
def __init__(self, autostart=False):
handlers = [
(r"/", IndexHandler),
(r"/start", StartHandler),
(r"/stop", StopHandler),
(r'/stream', WebSocketHandler),
]
web.Application.__init__(self, handlers)
self._reader_executor = ThreadPoolExecutor(1)
self._keep_reading = None
self.read_data = []
if autostart:
self.start_reader()
def start_reader(self):
if not self._keep_reading:
self._keep_reading = True
loop = ioloop.IOLoop.current()
self._reader_future = loop.run_in_executor(self._reader_executor, functools.partial(self.reader, loop))
def stop_reader(self):
if self._keep_reading:
self._keep_reading = False
self._reader_future.cancel()
def notify_clients(self, data=None):
for con in WebSocketHandler.connections:
try:
con.write_message("{}".format(data))
except Exception as ex:
log("error sending to {}".format(con))
def reader(self, main_loop):
import random
import time
while self._keep_reading:
time.sleep(1 + random.random()) # simulate read - block for some time
data = random.getrandbits(32)
print("reader: data={}".format(data))
if data:
main_loop.add_callback(self.notify_clients, data)
self.read_data.append(data)
time.sleep(0.1)
if __name__ == "__main__":
app = WebApplication(True)
app.listen(8888)
loop = ioloop.IOLoop.current()
try:
loop.start()
except KeyboardInterrupt as ex:
app.stop_reader()
for con in WebSocketHandler.connections:
con.close()
loop.stop()
好久没用tornado了。我想要一个 websocket,它从运行龙卷风的主机的串行设备获取更新。所以我尝试使用 tornado 进行多处理,但该进程无法访问 tornado websocket。我试图将它合并为协程,但似乎并没有产生。
class WebApplication(tornado.web.Application):
def __init__(self):
handlers = [
(r'/', IndexPageHandler),
(r"/config", ConfigHandler),
(r"/shutdown", ShutdownHandler),
(r'/websocket', WebSocketHandler),
(r'/(.*)', tornado.web.StaticFileHandler, {'path': resourcesWeb})
]
settings = {
'debug': debug,
'static_path': resourcesWeb,
'template_path': 'templates'
}
tornado.web.Application.__init__(self, handlers, **settings)
@gen.coroutine
def serial_reader(self):
log('serial_reader: start')
done = False
while not done:
sh.read()
serial_data_from = str(sh.data)
if len(serial_data_from) > 0:
if debug:
log('serial read:' + serial_data_from)
yield [con.write_message(serial_data_from) for con in WebSocketHandler.connections]
yield gen.sleep(0.3)
log('serial_reader: exit')
Python 3.8.5,龙卷风 6.1
我如何使用来自 Tornado 应用程序外部的数据正确且不断地更新 websocket
由于 sh.read
正在阻塞,您需要在执行程序中 运行 它。然后要在主线程中通知客户端,您需要使用 IOLoop.add_callback
(可以安全地从任何线程调用)。这也意味着 reader 方法成为常规同步方法。
示例:
from concurrent.futures import ThreadPoolExecutor
import functools
from tornado import web, websocket, ioloop
log = print
class IndexHandler(web.RequestHandler):
def get(self):
self.write("""<html>
<textarea cols="30" rows="10" id="output">%s</textarea><br />
<a href="/start" target="f" onclick="log(this.innerHTML)">start</a><br />
<a href="/stop" target="f" onclick="log(this.innerHTML)">stop</a><br />
<iframe name="f" width="100" height="30"></iframe>
<script>
ws = new WebSocket("ws://localhost:8888/stream");
out_el = document.getElementById("output");
function log(data) {out_el.value = data + "\n" + out_el.value;}
ws.onmessage = function (ev) {log(ev.data);}
</script>""" % "\n".join(map(str, reversed(self.application.read_data))))
class StartHandler(web.RequestHandler):
def get(self):
self.application.start_reader()
self.write("Started")
class StopHandler(web.RequestHandler):
def get(self):
self.application.stop_reader()
self.write("Stopped")
class WebSocketHandler(websocket.WebSocketHandler):
connections = set()
def open(self):
WebSocketHandler.connections.add(self)
def on_close(self):
if self in WebSocketHandler.connections:
WebSocketHandler.connections.remove(self)
class WebApplication(web.Application):
def __init__(self, autostart=False):
handlers = [
(r"/", IndexHandler),
(r"/start", StartHandler),
(r"/stop", StopHandler),
(r'/stream', WebSocketHandler),
]
web.Application.__init__(self, handlers)
self._reader_executor = ThreadPoolExecutor(1)
self._keep_reading = None
self.read_data = []
if autostart:
self.start_reader()
def start_reader(self):
if not self._keep_reading:
self._keep_reading = True
loop = ioloop.IOLoop.current()
self._reader_future = loop.run_in_executor(self._reader_executor, functools.partial(self.reader, loop))
def stop_reader(self):
if self._keep_reading:
self._keep_reading = False
self._reader_future.cancel()
def notify_clients(self, data=None):
for con in WebSocketHandler.connections:
try:
con.write_message("{}".format(data))
except Exception as ex:
log("error sending to {}".format(con))
def reader(self, main_loop):
import random
import time
while self._keep_reading:
time.sleep(1 + random.random()) # simulate read - block for some time
data = random.getrandbits(32)
print("reader: data={}".format(data))
if data:
main_loop.add_callback(self.notify_clients, data)
self.read_data.append(data)
time.sleep(0.1)
if __name__ == "__main__":
app = WebApplication(True)
app.listen(8888)
loop = ioloop.IOLoop.current()
try:
loop.start()
except KeyboardInterrupt as ex:
app.stop_reader()
for con in WebSocketHandler.connections:
con.close()
loop.stop()