Twisted:将数据写入同一个套接字后如何从客户端套接字读取?

Twisted: how to read from a client socket after writing data to the same socket?

我正在尝试用 twisted 编写一个简单的 TCP 服务器,它必须按顺序执行以下操作:

  1. 客户端连接到服务器,此连接的 KEEPALIVE 标志设置为 1。
  2. 服务器从客户端接收数据。
  3. 然后计算响应,这是一个列表。
  4. 然后服务器一个接一个地发送列表中的每一项,同时等待来自客户端的显式ACK,即发送列表中的单个项目后,服务器等待来自客户端的ACK数据包,并且只有在收到 ACK 后,它才会继续以相同的方式发送其余项目。

代码如下:

class MyFactory(ServerFactory):
    protocol = MyProtocol

    def __init__(self, service):
        self.service = service

class MyProtocol(Protocol):

    def connectionMade(self):
         try:
             self.transport.setTcpKeepAlive(1)
         except AttributeError: 
             pass
         self.deferred = Deferred()
         self.deferred.addCallback(self.factory.service.compute_response)
         self.deferred.addCallback(self.send_response)

    def dataReceived(self, data):
         self.fire(data)

    def fire(self, data):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.callback(data)

    def send_response(self, data):
        for item in data:
            d = Deferred()
            d.addCallback(self.transport.write)
            d.addCallback(self.wait_for_ack)
            d.callback(item)
        return   

    def wait_for_ack(self, dummy):
        try:
            self.transport.socket.recv(1024)
        except socket.error as e:
            print e
        return

在 运行 服务器和客户端我得到以下异常:

Resource temporarily unavailable

我了解此异常的原因 - 我正在尝试在非阻塞套接字上调用阻塞方法。

请帮我找到解决这个问题的方法。

您的示例存在一些问题:

  1. 您没有在任何地方(除其他外)定义 compute_response,所以我无法 运行 您的示例。考虑将其设为 http://sscce.org
  2. 永远不要在 Twisted 传输下的套接字上调用 sendrecv;让 Twisted 为您调用这些方法。在 recv 的情况下,它会将 recv 的结果传递给 dataReceived
  3. 您不能指望 dataReceived 来接收完整的消息;数据包可能 always be arbitrarily fragmented in transit 所以你需要有一个框架协议来封装你的消息。

但是,由于我的另一个答案非常拙劣,我欠你一个关于如何设置你想做的事情的更详尽的解释。

按照你的问题的规定,你的协议定义不够完整,无法给出答案;您不能使用原始 TCP 片段进行请求和响应,因为您的应用程序无法知道它们的开始和结束位置(请参阅上面的第 3 点)。所以,我为这个例子发明了一个小协议:它是一个行分隔的协议,客户端发送 "request foo\n",服务器立即发送 "thinking...\n",计算响应,然后发送 "response foo\n"并等待客户端发送"ok";作为响应,服务器将发送下一个 "response ..." 行,或者 "done\n" 行表明它已完成发送响应。

作为我们的协议,我相信你问题的关键要素是你不能 "wait for acknowledgement",或者就此而言,在 Twisted 中不能做任何其他事情。你需要做的是按照 "when an acknowledgement is received...".

的方式实现一些东西

因此,当收到消息时,我们需要识别消息的类型:确认还是请求?

  1. 如果是请求,我们需要计算响应;当响应计算完成后,我们需要将响应的所有元素加入队列并发送第一个元素。
  2. 如果是确认,我们需要检查响应的传出队列,如果有任何内容,发送它的第一个元素;否则,发送 "done".

这是一个完整的、运行可行的示例,它实现了我以这种方式描述的协议:

from twisted.internet.protocol import ServerFactory
from twisted.internet.task import deferLater
from twisted.internet import reactor
from twisted.internet.interfaces import ITCPTransport
from twisted.protocols.basic import LineReceiver

class MyProtocol(LineReceiver):
    delimiter = "\n"
    def connectionMade(self):
        if ITCPTransport.providedBy(self.transport):
            self.transport.setTcpKeepAlive(1)
        self.pendingResponses = []

    def lineReceived(self, line):
        split = line.rstrip("\r").split(None, 1)
        command = split[0]
        if command == b"request":
            # requesting a computed response
            payload = split[1]
            self.sendLine("thinking...")
            (self.factory.service.computeResponse(payload)
             .addCallback(self.sendResponses))
        elif command == b"ok":
            # acknowledging a response; send the next response
            if self.pendingResponses:
                self.sendOneResponse()
            else:
                self.sendLine(b"done")

    def sendOneResponse(self):
        self.sendLine(b"response " + self.pendingResponses.pop(0))

    def sendResponses(self, listOfResponses):
        self.pendingResponses.extend(listOfResponses)
        self.sendOneResponse()

class MyFactory(ServerFactory):
    protocol = MyProtocol

    def __init__(self, service):
        self.service = service

class MyService(object):
    def computeResponse(self, request):
        return deferLater(
            reactor, 1.0,
            lambda: [request + b" 1", request + b" 2", request + b" 3"]
        )

from twisted.internet.endpoints import StandardIOEndpoint
endpoint = StandardIOEndpoint(reactor)

endpoint.listen(MyFactory(MyService()))
reactor.run()

我已经在标准 I/O 上使这个 运行 可用,这样你就可以 运行 它并输入它来感受它是如何工作的;如果您想 运行 它在实际的网络端口上,只需将其替换为 different type of endpoint。希望这能回答您的问题。