在无协程函数中,如何从 Tornado TCP 的协程函数中获取 return 值?
In a no-coroutine function, how do you get a return value from coroutine function of Tornado TCP?
我用Tornado TCP写了3段代码。我遇到了一些困难。
我的代码如下:
client.py
'''tcp client'''
from socket import socket, AF_INET, SOCK_STREAM
s = socket(AF_INET, SOCK_STREAM)
s.connect(('localhost', 20000))
resp = s.recv(8192)
print('Response:', resp)
s.send(b'Hello\n')
s.close()
server.py
'''tcp server'''
#! /usr/bin/env python
#coding=utf-8
from tornado.tcpserver import TCPServer
from tornado.ioloop import IOLoop
from tornado.gen import *
clientDict=dict() #save infomation of client
class TcpConnection(object):
def __init__(self,stream,address):
self._stream=stream
self._address=address
self._stream.set_close_callback(self.on_close)
@coroutine
def send_messages(self):
yield self.send_message(b'world \n')
response = yield self.read_message()
return response
def read_message(self):
return self._stream.read_until(b'\n')
def send_message(self,data):
return self._stream.write(data)
def on_close(self):
global clientDict
clientDict.pop(self._address)
print("the monitored %d has left",self._address)
class MonitorServer(TCPServer):
@coroutine
def handle_stream(self,stream,address):
global clientDict
print("new connection",address,stream)
clientDict.setdefault(address, TcpConnection(stream,address))
if __name__=='__main__':
print('server start .....')
server=MonitorServer()
server.listen(20000)
IOLoop.instance().start()
main.py
import time
from threading import Thread
import copy
from server import *
def watchClient():
'''call the "send" function when a new client connect''
global clientDict
print('start watch')
lastClientList=list()
while True:
currentClientList=copy.deepcopy([key for key in clientDict.keys()])
difference=list(set(currentClientList).difference(set(lastClientList)))
if len(difference)>0:
send(difference)
lastClientList=copy.deepcopy(currentClientList)
time.sleep(5)
else:
time.sleep(5)
continue
def send(addressList):
'''send message to a new client and get response'''
global clientDict
for address in addressList:
response=clientDict[address].send_messages()
print(address," response :",response)
def listen():
server=MonitorServer()
server.listen(20000)
IOLoop.instance().start()
if __name__=='__main__':
listenThread=Thread(target=listen)
watchThead=Thread(target=watchClient)
watchThead.start()
listenThread.start()
我想在 main.py 为 运行 时获取 "print information"--- 地址,响应:b'hello\n'
但实际上我得到的 "print information" 是 ----
('127.0.0.1', 41233) response :<tornado.concurrent.Future object at 0x7f2894d30518>
它不能 return b'hello\n'。
那我猜它在无协程函数(def send(addressList))中无法从协程函数(@中得到合理的响应协程 def send_messages(self)).
如何解决?
顺便说一下,我想知道如何让 clientDict 成为 class MonitorServer[ 的 属性 =46=],不是全局 属性。
请帮我!谢谢。
一般来说,任何调用协程的东西都应该是协程本身。混合线程和协程可能非常棘手;大多数基于协程的代码故意不是线程安全的。
在不同的线程上从非协程函数调用协程的正确方法是这样的:
def call_coro_from_thread(f, *args, **kwargs):
q = queue.Queue()
def wrapper():
fut = f(*args, **kwargs)
fut.add_done_callback(q.put)
IOLoop.instance().add_callback(wrapper)
fut = q.get()
return fut.result()
IOLoop.add_callback
是为了安全地将控制转移到 IOLoop 线程,然后使用 Queue 将结果传回。
我用Tornado TCP写了3段代码。我遇到了一些困难。
我的代码如下:
client.py
'''tcp client'''
from socket import socket, AF_INET, SOCK_STREAM
s = socket(AF_INET, SOCK_STREAM)
s.connect(('localhost', 20000))
resp = s.recv(8192)
print('Response:', resp)
s.send(b'Hello\n')
s.close()
server.py
'''tcp server'''
#! /usr/bin/env python
#coding=utf-8
from tornado.tcpserver import TCPServer
from tornado.ioloop import IOLoop
from tornado.gen import *
clientDict=dict() #save infomation of client
class TcpConnection(object):
def __init__(self,stream,address):
self._stream=stream
self._address=address
self._stream.set_close_callback(self.on_close)
@coroutine
def send_messages(self):
yield self.send_message(b'world \n')
response = yield self.read_message()
return response
def read_message(self):
return self._stream.read_until(b'\n')
def send_message(self,data):
return self._stream.write(data)
def on_close(self):
global clientDict
clientDict.pop(self._address)
print("the monitored %d has left",self._address)
class MonitorServer(TCPServer):
@coroutine
def handle_stream(self,stream,address):
global clientDict
print("new connection",address,stream)
clientDict.setdefault(address, TcpConnection(stream,address))
if __name__=='__main__':
print('server start .....')
server=MonitorServer()
server.listen(20000)
IOLoop.instance().start()
main.py
import time
from threading import Thread
import copy
from server import *
def watchClient():
'''call the "send" function when a new client connect''
global clientDict
print('start watch')
lastClientList=list()
while True:
currentClientList=copy.deepcopy([key for key in clientDict.keys()])
difference=list(set(currentClientList).difference(set(lastClientList)))
if len(difference)>0:
send(difference)
lastClientList=copy.deepcopy(currentClientList)
time.sleep(5)
else:
time.sleep(5)
continue
def send(addressList):
'''send message to a new client and get response'''
global clientDict
for address in addressList:
response=clientDict[address].send_messages()
print(address," response :",response)
def listen():
server=MonitorServer()
server.listen(20000)
IOLoop.instance().start()
if __name__=='__main__':
listenThread=Thread(target=listen)
watchThead=Thread(target=watchClient)
watchThead.start()
listenThread.start()
我想在 main.py 为 运行 时获取 "print information"--- 地址,响应:b'hello\n'
但实际上我得到的 "print information" 是 ----
('127.0.0.1', 41233) response :<tornado.concurrent.Future object at 0x7f2894d30518>
它不能 return b'hello\n'。
那我猜它在无协程函数(def send(addressList))中无法从协程函数(@中得到合理的响应协程 def send_messages(self)).
如何解决?
顺便说一下,我想知道如何让 clientDict 成为 class MonitorServer[ 的 属性 =46=],不是全局 属性。 请帮我!谢谢。
一般来说,任何调用协程的东西都应该是协程本身。混合线程和协程可能非常棘手;大多数基于协程的代码故意不是线程安全的。
在不同的线程上从非协程函数调用协程的正确方法是这样的:
def call_coro_from_thread(f, *args, **kwargs):
q = queue.Queue()
def wrapper():
fut = f(*args, **kwargs)
fut.add_done_callback(q.put)
IOLoop.instance().add_callback(wrapper)
fut = q.get()
return fut.result()
IOLoop.add_callback
是为了安全地将控制转移到 IOLoop 线程,然后使用 Queue 将结果传回。