Tornado 异步协程

Tornado asynchronous coroutine

好久没用tornado了。我想要一个 websocket,它从运行龙卷风的主机的串行设备获取更新。所以我尝试使用 tornado 进行多处理,但该进程无法访问 tornado websocket。我试图将它合并为协程,但似乎并没有产生。

class WebApplication(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r'/', IndexPageHandler),
            (r"/config", ConfigHandler),
            (r"/shutdown", ShutdownHandler),
            (r'/websocket', WebSocketHandler),
            (r'/(.*)', tornado.web.StaticFileHandler, {'path': resourcesWeb})
        ]

        settings = {
            'debug': debug,
            'static_path': resourcesWeb,
            'template_path': 'templates'
        }
        tornado.web.Application.__init__(self, handlers, **settings)

    @gen.coroutine
    def serial_reader(self):
        log('serial_reader: start')
        done = False
        while not done:
            sh.read()
            serial_data_from = str(sh.data)
            if len(serial_data_from) > 0:
                if debug:
                    log('serial read:' + serial_data_from)
                    yield [con.write_message(serial_data_from) for con in WebSocketHandler.connections]
            yield gen.sleep(0.3)
        log('serial_reader: exit')

Python 3.8.5,龙卷风 6.1

我如何使用来自 Tornado 应用程序外部的数据正确且不断地更新 websocket

由于 sh.read 正在阻塞,您需要在执行程序中 运行 它。然后要在主线程中通知客户端,您需要使用 IOLoop.add_callback(可以安全地从任何线程调用)。这也意味着 reader 方法成为常规同步方法。

示例:

from concurrent.futures import ThreadPoolExecutor
import functools

from tornado import web, websocket, ioloop

log = print


class IndexHandler(web.RequestHandler):
    def get(self):
        self.write("""<html>
            <textarea cols="30" rows="10" id="output">%s</textarea><br />
            <a href="/start" target="f" onclick="log(this.innerHTML)">start</a><br />
            <a href="/stop" target="f" onclick="log(this.innerHTML)">stop</a><br />
            <iframe name="f" width="100" height="30"></iframe>
            <script>
                ws = new WebSocket("ws://localhost:8888/stream");
                out_el = document.getElementById("output");
                function log(data) {out_el.value = data + "\n" + out_el.value;}
                ws.onmessage = function (ev) {log(ev.data);}
            </script>""" % "\n".join(map(str, reversed(self.application.read_data))))


class StartHandler(web.RequestHandler):
    def get(self):
        self.application.start_reader()
        self.write("Started")


class StopHandler(web.RequestHandler):
    def get(self):
        self.application.stop_reader()
        self.write("Stopped")


class WebSocketHandler(websocket.WebSocketHandler):
    connections = set()

    def open(self):
        WebSocketHandler.connections.add(self)

    def on_close(self):
        if self in WebSocketHandler.connections:
            WebSocketHandler.connections.remove(self)


class WebApplication(web.Application):
    def __init__(self, autostart=False):
        handlers = [
            (r"/", IndexHandler),
            (r"/start", StartHandler),
            (r"/stop", StopHandler),
            (r'/stream', WebSocketHandler),
        ]
        web.Application.__init__(self, handlers)
        self._reader_executor = ThreadPoolExecutor(1)
        self._keep_reading = None
        self.read_data = []
        if autostart:
            self.start_reader()
    
    def start_reader(self):
        if not self._keep_reading:
            self._keep_reading = True
            loop = ioloop.IOLoop.current()
            self._reader_future = loop.run_in_executor(self._reader_executor, functools.partial(self.reader, loop))
    
    def stop_reader(self):
        if self._keep_reading:
            self._keep_reading = False
            self._reader_future.cancel()
    
    def notify_clients(self, data=None):
        for con in WebSocketHandler.connections:
            try:
                con.write_message("{}".format(data))
            except Exception as ex:
                log("error sending to {}".format(con))
    
    def reader(self, main_loop):
        import random
        import time
        while self._keep_reading:
            time.sleep(1 + random.random())  # simulate read - block for some time
            data = random.getrandbits(32)
            print("reader: data={}".format(data))
            if data:
                main_loop.add_callback(self.notify_clients, data)
                self.read_data.append(data)
            time.sleep(0.1)


if __name__ == "__main__":
    app = WebApplication(True)
    app.listen(8888)
    loop = ioloop.IOLoop.current()
    try:
        loop.start()
    except KeyboardInterrupt as ex:
        app.stop_reader()
        for con in WebSocketHandler.connections:
            con.close()
        loop.stop()