Python Tornado tcpserver 流写入结果

Python Tornado tcpserver stream write result

我有一个简单的 tornado tcp 服务器,有时会写入客户端流。

我想得到 stream.write() 的结果。它始终 returns None 无论流是否仍然处于活动状态。

我试过使用 stream.write() 回调和协程。

有没有办法实际获得 stream.write() 的结果?

这是代码。

import logging
import signal
import tornado.httpserver
import datetime
from tornado import gen


logger = logging.getLogger(__name__)


class Server(tornado.httpserver.TCPServer):
    def __init__(self, io_loop=None, ssl_options=None, **kwargs):

        logger.debug('tcp server started')

        if not io_loop:
            io_loop = tornado.ioloop.IOLoop.instance()

        self.io_loop = io_loop
        self.streams = []

        self.io_loop.add_timeout(datetime.timedelta(seconds=5), self.write_to_all_stream)

        tornado.httpserver.TCPServer.__init__(
            self, io_loop=io_loop, ssl_options=ssl_options, **kwargs
        )        


    def handle_stream(self, stream, address):
        logger.debug('New connection from %s' % str(address))
        self.streams.append(stream)


    def on_write_stream(self):
        logger.debug('written to stream')


    #@gen.coroutine
    def write_to_all_stream(self):
        logger.debug('writing to all streams')

        for s in self.streams:
            if s.closed():
                logger.debug('Stream is closed!')
            else:
                result = s.write('hello', self.on_write_stream)
                #result = yield s.write('hello')

                logger.debug('result: %s' % str(result))


def configure_signals():
    def bye_handler(signal, frame):
        logger.info('interrupt signal received, shutting down')

        io_loop = tornado.ioloop.IOLoop.instance()
        io_loop.stop()

    signal.signal(signal.SIGINT, bye_handler)
    signal.signal(signal.SIGTERM, bye_handler)


def configure_logging():
    logging.basicConfig(
        filename=None,
        level=logging.DEBUG,
        format='%(asctime)s: %(levelname)7s: [%(name)s]: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )


if __name__ == '__main__':
    configure_signals()
    configure_logging()

    loop = tornado.ioloop.IOLoop.instance()

    logger.debug('hello')

    s = Server().listen(12345, '127.0.0.1')

    loop.start()

    logger.debug('bye')

为了测试客户端流,我使用 nc ( netcat )。我用 CTRL + C / CTRL + Z

模拟断开连接
nc 127.0.0.1 12345

OS: Ubuntu 14.04

Python: 2.7.6

龙卷风版本:4.2.0

在深入挖掘并测试各种东西后,我找到了答案。

BaseIOStream.get_fd_error()

其中指出:

Returns information about any error on the underlying file.

This method is called after the IOLoop has signaled an error on the file descriptor, and should return an Exception (such as socket.error with additional information, or None if no such information is available.

http://tornado.readthedocs.org/en/latest/iostream.html#tornado.iostream.BaseIOStream.get_fd_error

这是上述代码的更新版本。如果发生流错误,它会检测到它们。

import logging
import signal
import tornado.httpserver
import datetime
import functools


logger = logging.getLogger(__name__)


class Server(tornado.httpserver.TCPServer):
    def __init__(self, io_loop=None, ssl_options=None, **kwargs):
        
        logger.debug('tcp server started')

        if not io_loop:
            io_loop = tornado.ioloop.IOLoop.instance()

        self.io_loop = io_loop
        self.streams = []
        
        self.io_loop.add_timeout(datetime.timedelta(seconds=5), self.write_to_all_stream)
        
        tornado.httpserver.TCPServer.__init__(
            self, io_loop=io_loop, ssl_options=ssl_options, **kwargs
        )        


    def handle_stream(self, stream, address):
        logger.debug('New connection from %s' % str(address))
        self.streams.append(stream)


    def on_write_stream(self, stream):
        status = stream.get_fd_error()
        
        if status[0]:
            logger.debug('Encountered an error when writing to stream: %s' % str(status[1]))
        else:
            logger.debug('Successfully written to stream')


    def write_to_all_stream(self):
        logger.debug('writing to all streams')
        
        for s in self.streams:
            s.write('hello', functools.partial(self.on_write_stream, s))

 
def configure_signals():
    def bye_handler(signal, frame):
        logger.info('interrupt signal received, shutting down')

        io_loop = tornado.ioloop.IOLoop.instance()
        io_loop.stop()

    signal.signal(signal.SIGINT, bye_handler)
    signal.signal(signal.SIGTERM, bye_handler)


def configure_logging():
    logging.basicConfig(
        filename=None,
        level=logging.DEBUG,
        format='%(asctime)s: %(levelname)7s: [%(name)s]: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )


if __name__ == '__main__':
    configure_signals()
    configure_logging()
    
    loop = tornado.ioloop.IOLoop.instance()
    
    logger.debug('hello')
    
    s = Server().listen(12345, '127.0.0.1')
    
    loop.start()
    
    logger.debug('bye')

更新:

stream.write() 对于 TCP 连接不是 100% 可靠的。 stream.get_fd_error() returns (0, 'Success') 当数据无误地放入OS队列但不保证它到达客户端。为此,您需要在应用级别实施确认系统。

更多细节在这里: