如何在 Tornado 异步 TCP 中同时处理多个命令以从套接字读取?
How to process multiple commands to read from socket at the same time in Tornado asynchronous TCP?
我的 TCP 服务器是用 Tornado 的异步 TCP 创建的。客户端是用C写的。
服务器代码:
#! /usr/bin/env python
#coding=utf-8
from tornado.tcpserver import TCPServer
from tornado.ioloop import IOLoop
class TcpConnection(object):
def __init__(self,stream,address):
self._stream=stream
self._address=address
self._stream.set_close_callback(self.on_close)
self.send_message(b'hello \n')
self.send_message(b'world \n')
def read_message(self):
self._stream.read_until(b'\n', self.handle_message)
def handle_message(self,data):
print(data)
def send_message(self,data):
self._stream.write(data)
self.read_message()
def on_close(self):
print("the monitored %d has left",self._address)
class MonitorServer(TCPServer):
def handle_stream(self,stream,address):
print("new connection",address,stream)
TcpConnection(stream,address)
if __name__=='__main__':
print('server start .....')
server=MonitorServer()
server.listen(20000)
IOLoop.instance().start()
客户代码:
#include <Winsock2.h>
#include <stdio.h>
#pragma comment(lib, "ws2_32.lib")
typedef struct SytemInit
{
char computer[32];
char user[32];
char os[256];
char processor[256];
char mem[128];
char disk[128];
}SYSTEMINIT;
typedef struct Command
{
int commandType;
char commandInfo[256];
}COMMAND;
void main()
{
int err;
SYSTEMINIT message;
COMMAND recvBuf;
SOCKET sockClient;
SOCKADDR_IN addrServer;
WSADATA wsaData;
WORD wVersionRequested;
wVersionRequested = MAKEWORD( 2, 2 );
err = WSAStartup( wVersionRequested, &wsaData );
if ( err != 0 )
{
return;
}
if ( LOBYTE( wsaData.wVersion ) != 2 || HIBYTE( wsaData.wVersion ) != 2 )
{
WSACleanup( );
return;
}
sockClient = socket(AF_INET, SOCK_STREAM, 0);
addrServer.sin_addr.S_un.S_addr = inet_addr("172.16.110.1");
addrServer.sin_family = AF_INET;
addrServer.sin_port = htons(20000);
connect(sockClient, (SOCKADDR *)&addrServer, sizeof(SOCKADDR));
recv(sockClient, (char*)&recvBuf, 100, 0);
strcpy(message.computer,"zz-pc");
strcpy(message.disk,"zz-disk");
strcpy(message.mem,"zz-men");
strcpy(message.os,"zz-os");
strcpy(message.processor,"zz-processor");
strcpy(message.user,"zz-user");
send(sockClient, (char*)&message, sizeof(message) + 1, 0);
closesocket(sockClient);
WSACleanup();
}
执行时出现以下错误:
ERROR:tornado.application:Error in connection callback
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/tornado/tcpserver.py", line 269, in _handle_connection
future = self.handle_stream(stream, address)
File "/home/zz/PycharmProjects/monitor/test.py", line 34, in handle_stream
TcpConnection(stream,address)
File "/home/zz/PycharmProjects/monitor/test.py", line 15, in __init__
self.send_message(b'world \n')
File "/home/zz/PycharmProjects/monitor/test.py", line 25, in send_message
self.read_message()
File "/home/zz/PycharmProjects/monitor/test.py", line 18, in read_message
self._stream.read_until(b'\n', self.handle_message)
File "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 270, in read_until
future = self._set_read_callback(callback)
File "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 658, in _set_read_callback
assert self._read_callback is None, "Already reading"
AssertionError: Already reading
我猜这个错误是因为self.send_message(b'hello \n')
和self.send_message(b'world \n')
同时从套接字读取。
我该如何解决这个问题?
你的这个代码:
self.send_message(b'hello \n')
self.send_message(b'world \n')
结果如下:
self._stream.write(b'hello \n')
self._stream.read_until(b'\n', self.handle_message)
self._stream.write(b'world \n')
self._stream.read_until(b'\n', self.handle_message)
由于您是通过回调调用 read_until
,因此您正在尝试同时并行执行两个 read_until
。不过,这是无稽之谈,因为它们是通过 TCP 连接一个接一个地到达的。您必须先阅读一条消息,然后再阅读另一条消息。
我觉得使用 gen.coroutine
会使这更容易。您也可以使用回调来完成;我稍后会展示如何。
使用gen.coroutine
以下是我如何使用协程更改您的 TcpConnection
class:
class TcpConnection(object):
def __init__(self,stream,address):
self._stream=stream
self._address=address
self._stream.set_close_callback(self.on_close)
@gen.coroutine
def send_messages(self):
yield self.send_message(b'hello \n')
response1 = yield self.read_message()
print(response1)
yield self.send_message(b'world \n')
# You can receive the result in-line, but you need to wrap with ( ):
print((yield self.read_message()))
def read_message(self):
return self._stream.read_until(b'\n')
def send_message(self,data):
return self._stream.write(data)
def on_close(self):
print("the monitored %d has left",self._address)
class MonitorServer(TCPServer):
@gen.coroutine
def handle_stream(self,stream,address):
print("new connection",address,stream)
conn = TcpConnection(stream,address)
yield conn.send_messages()
通过使用协同程序,您可以按照希望代码执行的顺序编写代码,并且可以像 return 值一样将响应读取到局部变量中,而不必使用处理程序方法。每当你 yield
做某事时,你都会停下来等待它完成。
我还把send_message()
和receive_message()
分开了,因为我觉得这样更清楚。如果您认为将它们放在 send_message()
中更好,您可以这样做:
@gen.coroutine
def send_message(self,data):
yield self._stream.write(data)
return (yield self.receive_message())
如果您想先发送这两条消息,然后然后等待接收这两条消息,您也可以这样做:
@gen.coroutine
def send_messages(self):
yield self.send_message(b'hello \n')
yield self.send_message(b'world \n')
print((yield self.read_message()))
print((yield self.read_message()))
使用回调
任何你可以用协程编码的东西,你都可以用回调编码。但是,您需要做的是在回调之间跟踪您的状态(您所在的位置)。这可以通过在不同的回调之间跳来跳去来完成。例如:
def send_first_message(self):
self.send_message(b'hello \n', self.receive_first_response)
def receive_first_response(self, data):
print(data)
self.send_message(b'world \n', self.receive_second_response)
def receive_second_response(self, data):
print(data)
def read_message(self, callback):
self._stream.read_until(b'\n', callback)
def send_message(self, data, callback):
self._stream.write(data)
self.read_message(callback)
或者通过其他方式跟踪您在通信中的位置,例如在您的 class 实例的字段中存储一些内容。
我的 TCP 服务器是用 Tornado 的异步 TCP 创建的。客户端是用C写的。
服务器代码:
#! /usr/bin/env python
#coding=utf-8
from tornado.tcpserver import TCPServer
from tornado.ioloop import IOLoop
class TcpConnection(object):
def __init__(self,stream,address):
self._stream=stream
self._address=address
self._stream.set_close_callback(self.on_close)
self.send_message(b'hello \n')
self.send_message(b'world \n')
def read_message(self):
self._stream.read_until(b'\n', self.handle_message)
def handle_message(self,data):
print(data)
def send_message(self,data):
self._stream.write(data)
self.read_message()
def on_close(self):
print("the monitored %d has left",self._address)
class MonitorServer(TCPServer):
def handle_stream(self,stream,address):
print("new connection",address,stream)
TcpConnection(stream,address)
if __name__=='__main__':
print('server start .....')
server=MonitorServer()
server.listen(20000)
IOLoop.instance().start()
客户代码:
#include <Winsock2.h>
#include <stdio.h>
#pragma comment(lib, "ws2_32.lib")
typedef struct SytemInit
{
char computer[32];
char user[32];
char os[256];
char processor[256];
char mem[128];
char disk[128];
}SYSTEMINIT;
typedef struct Command
{
int commandType;
char commandInfo[256];
}COMMAND;
void main()
{
int err;
SYSTEMINIT message;
COMMAND recvBuf;
SOCKET sockClient;
SOCKADDR_IN addrServer;
WSADATA wsaData;
WORD wVersionRequested;
wVersionRequested = MAKEWORD( 2, 2 );
err = WSAStartup( wVersionRequested, &wsaData );
if ( err != 0 )
{
return;
}
if ( LOBYTE( wsaData.wVersion ) != 2 || HIBYTE( wsaData.wVersion ) != 2 )
{
WSACleanup( );
return;
}
sockClient = socket(AF_INET, SOCK_STREAM, 0);
addrServer.sin_addr.S_un.S_addr = inet_addr("172.16.110.1");
addrServer.sin_family = AF_INET;
addrServer.sin_port = htons(20000);
connect(sockClient, (SOCKADDR *)&addrServer, sizeof(SOCKADDR));
recv(sockClient, (char*)&recvBuf, 100, 0);
strcpy(message.computer,"zz-pc");
strcpy(message.disk,"zz-disk");
strcpy(message.mem,"zz-men");
strcpy(message.os,"zz-os");
strcpy(message.processor,"zz-processor");
strcpy(message.user,"zz-user");
send(sockClient, (char*)&message, sizeof(message) + 1, 0);
closesocket(sockClient);
WSACleanup();
}
执行时出现以下错误:
ERROR:tornado.application:Error in connection callback
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/tornado/tcpserver.py", line 269, in _handle_connection
future = self.handle_stream(stream, address)
File "/home/zz/PycharmProjects/monitor/test.py", line 34, in handle_stream
TcpConnection(stream,address)
File "/home/zz/PycharmProjects/monitor/test.py", line 15, in __init__
self.send_message(b'world \n')
File "/home/zz/PycharmProjects/monitor/test.py", line 25, in send_message
self.read_message()
File "/home/zz/PycharmProjects/monitor/test.py", line 18, in read_message
self._stream.read_until(b'\n', self.handle_message)
File "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 270, in read_until
future = self._set_read_callback(callback)
File "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 658, in _set_read_callback
assert self._read_callback is None, "Already reading"
AssertionError: Already reading
我猜这个错误是因为self.send_message(b'hello \n')
和self.send_message(b'world \n')
同时从套接字读取。
我该如何解决这个问题?
你的这个代码:
self.send_message(b'hello \n')
self.send_message(b'world \n')
结果如下:
self._stream.write(b'hello \n')
self._stream.read_until(b'\n', self.handle_message)
self._stream.write(b'world \n')
self._stream.read_until(b'\n', self.handle_message)
由于您是通过回调调用 read_until
,因此您正在尝试同时并行执行两个 read_until
。不过,这是无稽之谈,因为它们是通过 TCP 连接一个接一个地到达的。您必须先阅读一条消息,然后再阅读另一条消息。
我觉得使用 gen.coroutine
会使这更容易。您也可以使用回调来完成;我稍后会展示如何。
使用gen.coroutine
以下是我如何使用协程更改您的 TcpConnection
class:
class TcpConnection(object):
def __init__(self,stream,address):
self._stream=stream
self._address=address
self._stream.set_close_callback(self.on_close)
@gen.coroutine
def send_messages(self):
yield self.send_message(b'hello \n')
response1 = yield self.read_message()
print(response1)
yield self.send_message(b'world \n')
# You can receive the result in-line, but you need to wrap with ( ):
print((yield self.read_message()))
def read_message(self):
return self._stream.read_until(b'\n')
def send_message(self,data):
return self._stream.write(data)
def on_close(self):
print("the monitored %d has left",self._address)
class MonitorServer(TCPServer):
@gen.coroutine
def handle_stream(self,stream,address):
print("new connection",address,stream)
conn = TcpConnection(stream,address)
yield conn.send_messages()
通过使用协同程序,您可以按照希望代码执行的顺序编写代码,并且可以像 return 值一样将响应读取到局部变量中,而不必使用处理程序方法。每当你 yield
做某事时,你都会停下来等待它完成。
我还把send_message()
和receive_message()
分开了,因为我觉得这样更清楚。如果您认为将它们放在 send_message()
中更好,您可以这样做:
@gen.coroutine
def send_message(self,data):
yield self._stream.write(data)
return (yield self.receive_message())
如果您想先发送这两条消息,然后然后等待接收这两条消息,您也可以这样做:
@gen.coroutine
def send_messages(self):
yield self.send_message(b'hello \n')
yield self.send_message(b'world \n')
print((yield self.read_message()))
print((yield self.read_message()))
使用回调
任何你可以用协程编码的东西,你都可以用回调编码。但是,您需要做的是在回调之间跟踪您的状态(您所在的位置)。这可以通过在不同的回调之间跳来跳去来完成。例如:
def send_first_message(self):
self.send_message(b'hello \n', self.receive_first_response)
def receive_first_response(self, data):
print(data)
self.send_message(b'world \n', self.receive_second_response)
def receive_second_response(self, data):
print(data)
def read_message(self, callback):
self._stream.read_until(b'\n', callback)
def send_message(self, data, callback):
self._stream.write(data)
self.read_message(callback)
或者通过其他方式跟踪您在通信中的位置,例如在您的 class 实例的字段中存储一些内容。