如何在 Tornado 异步 TCP 中同时处理多个命令以从套接字读取?

How to process multiple commands to read from socket at the same time in Tornado asynchronous TCP?

我的 TCP 服务器是用 Tornado 的异步 TCP 创建的。客户端是用C写的。

服务器代码:

#! /usr/bin/env python
#coding=utf-8

from tornado.tcpserver import TCPServer
from tornado.ioloop import IOLoop

class TcpConnection(object):
    def __init__(self,stream,address):
        self._stream=stream
        self._address=address
        self._stream.set_close_callback(self.on_close)
        self.send_message(b'hello \n')
        self.send_message(b'world \n')

    def read_message(self):
        self._stream.read_until(b'\n', self.handle_message)

    def handle_message(self,data):
        print(data)

    def send_message(self,data):
        self._stream.write(data)
        self.read_message()

    def on_close(self):
        print("the monitored %d has left",self._address)

class MonitorServer(TCPServer):
    def handle_stream(self,stream,address):
        print("new connection",address,stream)
        TcpConnection(stream,address)

if  __name__=='__main__':
    print('server start .....')
    server=MonitorServer()
    server.listen(20000)
    IOLoop.instance().start()

客户代码:

#include <Winsock2.h>
#include <stdio.h>
#pragma comment(lib, "ws2_32.lib")

typedef struct SytemInit
{
    char computer[32];      
    char user[32];              
    char os[256];               
    char processor[256];    
    char mem[128];          
    char disk[128];             
}SYSTEMINIT;

typedef struct Command
{
    int  commandType;                             
    char commandInfo[256];                 
}COMMAND;

void main()
{
    int err;
    SYSTEMINIT message;
    COMMAND recvBuf;

    SOCKET sockClient; 
    SOCKADDR_IN addrServer; 

    WSADATA wsaData;
    WORD wVersionRequested;

    wVersionRequested = MAKEWORD( 2, 2 );

    err = WSAStartup( wVersionRequested, &wsaData );

    if ( err != 0 )
    {
        return;
    }

    if ( LOBYTE( wsaData.wVersion ) != 2 || HIBYTE( wsaData.wVersion ) != 2 )
    {
        WSACleanup( );
        return;
    }

    sockClient = socket(AF_INET, SOCK_STREAM, 0);

    addrServer.sin_addr.S_un.S_addr = inet_addr("172.16.110.1");  
    addrServer.sin_family = AF_INET;                           
    addrServer.sin_port = htons(20000);                         

    connect(sockClient, (SOCKADDR *)&addrServer, sizeof(SOCKADDR));

    recv(sockClient, (char*)&recvBuf, 100, 0);

    strcpy(message.computer,"zz-pc");
    strcpy(message.disk,"zz-disk");
    strcpy(message.mem,"zz-men");
    strcpy(message.os,"zz-os");
    strcpy(message.processor,"zz-processor");
    strcpy(message.user,"zz-user");

    send(sockClient, (char*)&message, sizeof(message) + 1, 0);

    closesocket(sockClient);
    WSACleanup();
}

执行时出现以下错误:

ERROR:tornado.application:Error in connection callback
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/tornado/tcpserver.py", line 269, in _handle_connection
    future = self.handle_stream(stream, address)
  File "/home/zz/PycharmProjects/monitor/test.py", line 34, in handle_stream
    TcpConnection(stream,address)
  File "/home/zz/PycharmProjects/monitor/test.py", line 15, in __init__
    self.send_message(b'world \n')
  File "/home/zz/PycharmProjects/monitor/test.py", line 25, in send_message
    self.read_message()
  File "/home/zz/PycharmProjects/monitor/test.py", line 18, in read_message
    self._stream.read_until(b'\n', self.handle_message)
  File "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 270, in read_until
    future = self._set_read_callback(callback)
  File "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 658, in _set_read_callback
    assert self._read_callback is None, "Already reading"
AssertionError: Already reading

我猜这个错误是因为self.send_message(b'hello \n')self.send_message(b'world \n')同时从套接字读取。 我该如何解决这个问题?

你的这个代码:

self.send_message(b'hello \n')
self.send_message(b'world \n')

结果如下:

self._stream.write(b'hello \n')
self._stream.read_until(b'\n', self.handle_message)
self._stream.write(b'world \n')
self._stream.read_until(b'\n', self.handle_message)

由于您是通过回调调用 read_until,因此您正在尝试同时并行执行两个 read_until。不过,这是无稽之谈,因为它们是通过 TCP 连接一个接一个地到达的。您必须先阅读一条消息,然后再阅读另一条消息。

我觉得使用 gen.coroutine 会使这更容易。您也可以使用回调来完成;我稍后会展示如何。

使用gen.coroutine

以下是我如何使用协程更改您的 TcpConnection class:

class TcpConnection(object):
    def __init__(self,stream,address):
        self._stream=stream
        self._address=address
        self._stream.set_close_callback(self.on_close)

    @gen.coroutine
    def send_messages(self):
        yield self.send_message(b'hello \n')
        response1 = yield self.read_message()
        print(response1)
        yield self.send_message(b'world \n')
        # You can receive the result in-line, but you need to wrap with ( ):
        print((yield self.read_message()))

    def read_message(self):
        return self._stream.read_until(b'\n')

    def send_message(self,data):
        return self._stream.write(data)

    def on_close(self):
        print("the monitored %d has left",self._address)

class MonitorServer(TCPServer):
    @gen.coroutine
    def handle_stream(self,stream,address):
        print("new connection",address,stream)
        conn = TcpConnection(stream,address)
        yield conn.send_messages()

通过使用协同程序,您可以按照希望代码执行的顺序编写代码,并且可以像 return 值一样将响应读取到局部变量中,而不必使用处理程序方法。每当你 yield 做某事时,你都会停下来等待它完成。

我还把send_message()receive_message()分开了,因为我觉得这样更清楚。如果您认为将它们放在 send_message() 中更好,您可以这样做:

@gen.coroutine
def send_message(self,data):
    yield self._stream.write(data)
    return (yield self.receive_message())

如果您想先发送这两条消息,然后然后等待接收这两条消息,您也可以这样做:

@gen.coroutine
def send_messages(self):
    yield self.send_message(b'hello \n')
    yield self.send_message(b'world \n')
    print((yield self.read_message()))
    print((yield self.read_message()))

使用回调

任何你可以用协程编码的东西,你都可以用回调编码。但是,您需要做的是在回调之间跟踪您的状态(您所在的位置)。这可以通过在不同的回调之间跳来跳去来完成。例如:

def send_first_message(self):
    self.send_message(b'hello \n', self.receive_first_response)

def receive_first_response(self, data):
    print(data)
    self.send_message(b'world \n', self.receive_second_response)

def receive_second_response(self, data):
    print(data)

def read_message(self, callback):
    self._stream.read_until(b'\n', callback)

def send_message(self, data, callback):
    self._stream.write(data)
    self.read_message(callback)

或者通过其他方式跟踪您在通信中的位置,例如在您的 class 实例的字段中存储一些内容。