Twisted:将数据写入同一个套接字后如何从客户端套接字读取?
Twisted: how to read from a client socket after writing data to the same socket?
我正在尝试用 twisted 编写一个简单的 TCP 服务器,它必须按顺序执行以下操作:
- 客户端连接到服务器,此连接的
KEEPALIVE
标志设置为 1。
- 服务器从客户端接收数据。
- 然后计算响应,这是一个列表。
- 然后服务器一个接一个地发送列表中的每一项,同时等待来自客户端的显式ACK,即发送列表中的单个项目后,服务器等待来自客户端的ACK数据包,并且只有在收到 ACK 后,它才会继续以相同的方式发送其余项目。
代码如下:
class MyFactory(ServerFactory):
protocol = MyProtocol
def __init__(self, service):
self.service = service
class MyProtocol(Protocol):
def connectionMade(self):
try:
self.transport.setTcpKeepAlive(1)
except AttributeError:
pass
self.deferred = Deferred()
self.deferred.addCallback(self.factory.service.compute_response)
self.deferred.addCallback(self.send_response)
def dataReceived(self, data):
self.fire(data)
def fire(self, data):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.callback(data)
def send_response(self, data):
for item in data:
d = Deferred()
d.addCallback(self.transport.write)
d.addCallback(self.wait_for_ack)
d.callback(item)
return
def wait_for_ack(self, dummy):
try:
self.transport.socket.recv(1024)
except socket.error as e:
print e
return
在 运行 服务器和客户端我得到以下异常:
Resource temporarily unavailable
我了解此异常的原因 - 我正在尝试在非阻塞套接字上调用阻塞方法。
请帮我找到解决这个问题的方法。
您的示例存在一些问题:
- 您没有在任何地方(除其他外)定义
compute_response
,所以我无法 运行 您的示例。考虑将其设为 http://sscce.org
- 永远不要在 Twisted 传输下的套接字上调用
send
或 recv
;让 Twisted 为您调用这些方法。在 recv
的情况下,它会将 recv
的结果传递给 dataReceived
。
- 您不能指望
dataReceived
来接收完整的消息;数据包可能 always be arbitrarily fragmented in transit 所以你需要有一个框架协议来封装你的消息。
但是,由于我的另一个答案非常拙劣,我欠你一个关于如何设置你想做的事情的更详尽的解释。
按照你的问题的规定,你的协议定义不够完整,无法给出答案;您不能使用原始 TCP 片段进行请求和响应,因为您的应用程序无法知道它们的开始和结束位置(请参阅上面的第 3 点)。所以,我为这个例子发明了一个小协议:它是一个行分隔的协议,客户端发送 "request foo\n"
,服务器立即发送 "thinking...\n"
,计算响应,然后发送 "response foo\n"
并等待客户端发送"ok"
;作为响应,服务器将发送下一个 "response ..."
行,或者 "done\n"
行表明它已完成发送响应。
作为我们的协议,我相信你问题的关键要素是你不能 "wait for acknowledgement",或者就此而言,在 Twisted 中不能做任何其他事情。你需要做的是按照 "when an acknowledgement is received...".
的方式实现一些东西
因此,当收到消息时,我们需要识别消息的类型:确认还是请求?
- 如果是请求,我们需要计算响应;当响应计算完成后,我们需要将响应的所有元素加入队列并发送第一个元素。
- 如果是确认,我们需要检查响应的传出队列,如果有任何内容,发送它的第一个元素;否则,发送 "done".
这是一个完整的、运行可行的示例,它实现了我以这种方式描述的协议:
from twisted.internet.protocol import ServerFactory
from twisted.internet.task import deferLater
from twisted.internet import reactor
from twisted.internet.interfaces import ITCPTransport
from twisted.protocols.basic import LineReceiver
class MyProtocol(LineReceiver):
delimiter = "\n"
def connectionMade(self):
if ITCPTransport.providedBy(self.transport):
self.transport.setTcpKeepAlive(1)
self.pendingResponses = []
def lineReceived(self, line):
split = line.rstrip("\r").split(None, 1)
command = split[0]
if command == b"request":
# requesting a computed response
payload = split[1]
self.sendLine("thinking...")
(self.factory.service.computeResponse(payload)
.addCallback(self.sendResponses))
elif command == b"ok":
# acknowledging a response; send the next response
if self.pendingResponses:
self.sendOneResponse()
else:
self.sendLine(b"done")
def sendOneResponse(self):
self.sendLine(b"response " + self.pendingResponses.pop(0))
def sendResponses(self, listOfResponses):
self.pendingResponses.extend(listOfResponses)
self.sendOneResponse()
class MyFactory(ServerFactory):
protocol = MyProtocol
def __init__(self, service):
self.service = service
class MyService(object):
def computeResponse(self, request):
return deferLater(
reactor, 1.0,
lambda: [request + b" 1", request + b" 2", request + b" 3"]
)
from twisted.internet.endpoints import StandardIOEndpoint
endpoint = StandardIOEndpoint(reactor)
endpoint.listen(MyFactory(MyService()))
reactor.run()
我已经在标准 I/O 上使这个 运行 可用,这样你就可以 运行 它并输入它来感受它是如何工作的;如果您想 运行 它在实际的网络端口上,只需将其替换为 different type of endpoint。希望这能回答您的问题。
我正在尝试用 twisted 编写一个简单的 TCP 服务器,它必须按顺序执行以下操作:
- 客户端连接到服务器,此连接的
KEEPALIVE
标志设置为 1。 - 服务器从客户端接收数据。
- 然后计算响应,这是一个列表。
- 然后服务器一个接一个地发送列表中的每一项,同时等待来自客户端的显式ACK,即发送列表中的单个项目后,服务器等待来自客户端的ACK数据包,并且只有在收到 ACK 后,它才会继续以相同的方式发送其余项目。
代码如下:
class MyFactory(ServerFactory):
protocol = MyProtocol
def __init__(self, service):
self.service = service
class MyProtocol(Protocol):
def connectionMade(self):
try:
self.transport.setTcpKeepAlive(1)
except AttributeError:
pass
self.deferred = Deferred()
self.deferred.addCallback(self.factory.service.compute_response)
self.deferred.addCallback(self.send_response)
def dataReceived(self, data):
self.fire(data)
def fire(self, data):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.callback(data)
def send_response(self, data):
for item in data:
d = Deferred()
d.addCallback(self.transport.write)
d.addCallback(self.wait_for_ack)
d.callback(item)
return
def wait_for_ack(self, dummy):
try:
self.transport.socket.recv(1024)
except socket.error as e:
print e
return
在 运行 服务器和客户端我得到以下异常:
Resource temporarily unavailable
我了解此异常的原因 - 我正在尝试在非阻塞套接字上调用阻塞方法。
请帮我找到解决这个问题的方法。
您的示例存在一些问题:
- 您没有在任何地方(除其他外)定义
compute_response
,所以我无法 运行 您的示例。考虑将其设为 http://sscce.org - 永远不要在 Twisted 传输下的套接字上调用
send
或recv
;让 Twisted 为您调用这些方法。在recv
的情况下,它会将recv
的结果传递给dataReceived
。 - 您不能指望
dataReceived
来接收完整的消息;数据包可能 always be arbitrarily fragmented in transit 所以你需要有一个框架协议来封装你的消息。
但是,由于我的另一个答案非常拙劣,我欠你一个关于如何设置你想做的事情的更详尽的解释。
按照你的问题的规定,你的协议定义不够完整,无法给出答案;您不能使用原始 TCP 片段进行请求和响应,因为您的应用程序无法知道它们的开始和结束位置(请参阅上面的第 3 点)。所以,我为这个例子发明了一个小协议:它是一个行分隔的协议,客户端发送 "request foo\n"
,服务器立即发送 "thinking...\n"
,计算响应,然后发送 "response foo\n"
并等待客户端发送"ok"
;作为响应,服务器将发送下一个 "response ..."
行,或者 "done\n"
行表明它已完成发送响应。
作为我们的协议,我相信你问题的关键要素是你不能 "wait for acknowledgement",或者就此而言,在 Twisted 中不能做任何其他事情。你需要做的是按照 "when an acknowledgement is received...".
的方式实现一些东西因此,当收到消息时,我们需要识别消息的类型:确认还是请求?
- 如果是请求,我们需要计算响应;当响应计算完成后,我们需要将响应的所有元素加入队列并发送第一个元素。
- 如果是确认,我们需要检查响应的传出队列,如果有任何内容,发送它的第一个元素;否则,发送 "done".
这是一个完整的、运行可行的示例,它实现了我以这种方式描述的协议:
from twisted.internet.protocol import ServerFactory
from twisted.internet.task import deferLater
from twisted.internet import reactor
from twisted.internet.interfaces import ITCPTransport
from twisted.protocols.basic import LineReceiver
class MyProtocol(LineReceiver):
delimiter = "\n"
def connectionMade(self):
if ITCPTransport.providedBy(self.transport):
self.transport.setTcpKeepAlive(1)
self.pendingResponses = []
def lineReceived(self, line):
split = line.rstrip("\r").split(None, 1)
command = split[0]
if command == b"request":
# requesting a computed response
payload = split[1]
self.sendLine("thinking...")
(self.factory.service.computeResponse(payload)
.addCallback(self.sendResponses))
elif command == b"ok":
# acknowledging a response; send the next response
if self.pendingResponses:
self.sendOneResponse()
else:
self.sendLine(b"done")
def sendOneResponse(self):
self.sendLine(b"response " + self.pendingResponses.pop(0))
def sendResponses(self, listOfResponses):
self.pendingResponses.extend(listOfResponses)
self.sendOneResponse()
class MyFactory(ServerFactory):
protocol = MyProtocol
def __init__(self, service):
self.service = service
class MyService(object):
def computeResponse(self, request):
return deferLater(
reactor, 1.0,
lambda: [request + b" 1", request + b" 2", request + b" 3"]
)
from twisted.internet.endpoints import StandardIOEndpoint
endpoint = StandardIOEndpoint(reactor)
endpoint.listen(MyFactory(MyService()))
reactor.run()
我已经在标准 I/O 上使这个 运行 可用,这样你就可以 运行 它并输入它来感受它是如何工作的;如果您想 运行 它在实际的网络端口上,只需将其替换为 different type of endpoint。希望这能回答您的问题。