return 控制到传输

return control to the transport

我正在尝试模拟服务器定期接收数据的情况。在我的设置中,我 运行 一个设置服务器的进程和另一个设置一堆客户端的进程(足以考虑单个客户端)。我通过将主要来自 . The server/clients communicate by sending messages using transport.write. First, the server tells the clients to start (this works fine AFAIK). The clients report back to the server as they make progress. What has me confused is that I only get these intermittent messages at the very end when the client is done. This could be a problem with buffer flush and I tried (unsuccessfully) things like This 的点点滴滴放在一起来设置一些代码。此外,每条消息都非常大,我尝试多次发送相同的消息,以便清除缓冲区。

我怀疑我看到的是 returning the control to the transport 的问题,但我不知道如何解决它。

非常感谢您对这个问题或任何其他跳到您的问题的帮助。

服务器:

from twisted.internet import reactor, protocol

import time
import serverSideAnalysis
import pdb
#import bson, json, msgpack
import _pickle as pickle  # I expect the users to authenticate and not 
                          # do anything malicious. 


PORT = 9000
NUM = 1
local_scratch="/local/scratch"


class Hub(protocol.Protocol):
  def __init__(self,factory, clients, nclients):
    self.clients = clients 
    self.nclients = nclients
    self.factory = factory
    self.dispatcher = serverSideAnalysis.ServerTalker(NUM, self, 
          local_scratch)

  def connectionMade(self):
    print("connected to user" , (self))
    if len(self.clients) < self.nclients:
      self.factory.clients.append(self)
    else:
      self.factory.clients[self.nclients] = self
    if len(self.clients) == NUM:
      val = input("Looks like everyone is here, shall we start? (Y/N)")
      while (val.upper() != "Y"):
        time.sleep(20)
        val = input("Looks like everyone is here, shall we start??? (Y/N)")
      message = pickle.dumps({"TASK": "INIT", "SUBTASK":"STORE"})
      self.message(message) # This reaches the client as I had expected

  def message(self, command):
    for c in self.factory.clients:
      c.transport.write(command)

  def connectionLost(self, reason):
    self.factory.clients.remove(self)
    self.nclients -= 1

  def dataReceived(self, data):
    if len(self.clients) == NUM:
      self.dispatcher.dispatch(data)

class PauseTransport(protocol.Protocol):
  def makeConnection(self, transport):
    transport.pauseProducing()

class HubFactory(protocol.Factory):
  def __init__(self, num):
    self.clients = set([])
    self.nclients = 0 
    self.totConnections = num

  def buildProtocol(self, addr):
    print(self.nclients)
    if self.nclients < self.totConnections:
      self.nclients += 1
      return Hub(self, self.clients, self.nclients)
    protocol = PauseTransport()
    protocol.factory = self
    return protocol

factory = HubFactory(NUM)
reactor.listenTCP(PORT, factory)
factory.clients = []
reactor.run()

客户:

from twisted.internet import reactor, protocol
import time
import clientSideAnalysis
import sys


HOST = 'localhost'
PORT = 9000
local_scratch="/local/scratch"

class MyClient(protocol.Protocol):

  def connectionMade(self):
    print("connected!")
    self.factory.clients.append(self)
    print ("clients are ", self.factory.clients)

    self.cdispatcher = clientSideAnalysis.ServerTalker(analysis_file_name, local_scratch, self)

  def clientConnectionLost(self, reason):
    #TODO send warning
    self.factory.clients.remove(self)

  def dataReceived(self, data): #This is the problematic part I think
    self.cdispatcher.dispatch(data)
    print("1 sent")
    time.sleep(10)
    self.cdispatcher.dispatch(data)
    print("2 sent")
    time.sleep(10)
    self.cdispatcher.dispatch(data)
    time.sleep(10)


  def message(self, data):
    self.transport.write(data)

class MyClientFactory(protocol.ClientFactory):
  protocol = MyClient

if __name__=="__main__":
  analysis_file_name = sys.argv[1]

  factory = MyClientFactory()
  reactor.connectTCP(HOST, PORT, factory)
  factory.clients = []
  reactor.run()

关于调度员做什么的最后一点相关信息。

在这两种情况下,他们都会加载已到达的消息(字典)并根据内容进行一些计算。每隔一段时间,他们就会使用 message 方法与他们的当前值进行通信。

最后,我使用的是 python 3.6。和扭曲的 18.9.0

您通过 Protocol.dataReceived 方法 return 控制反应器的方式就是您通过该方法 return 的方式。例如:

def dataReceived(self, data):
    self.cdispatcher.dispatch(data)
    print("1 sent")

如果您希望在之后完成更多工作,您有一些选择。如果您希望工作在经过一段时间后发生,请使用 reactor.callLater。如果您希望工作在分派到另一个线程后发生,请使用 twisted.internet.threads.deferToThread。如果您希望工作响应某些其他事件(例如,接收到数据)而发生,请将其放在处理该事件的回调中(例如,dataReceived)。