Python Twisted 通过网络发送大文件

Python Twisted sending large a file across network

我正在尝试使用带有 LineReceiver 协议的 Twisted 通过网络发送文件。我看到的问题是,当我读取一个二进制文件并尝试发送它们根本不发送的块时。

我正在使用以下方式读取文件:

import json
import time
import threading
from twisted.internet import reactor, threads
from twisted.protocols.basic import LineReceiver
from twisted.internet import protocol

MaximumMsgSize = 15500

trySend = True
connectionToServer = None

class ClientInterfaceFactory(protocol.Factory):

    def buildProtocol(self, addr):
        return WoosterInterfaceProtocol(self._msgProcessor, self._logger)

class ClientInterfaceProtocol(LineReceiver):

    def connectionMade(self):
        connectionToServer = self

    def _DecodeMessage(self, rawMsg):
        header, body = json.loads(rawMsg)   
        return (header, json.loads(body))

    def ProcessIncomingMsg(self, rawMsg, connObject):
        # Decode raw message.
        decodedMsg = self._DecodeMessage(rawMsg)

        self.ProccessTransmitJobToNode(decodedMsg, connObject)

    def _BuildMessage(self, id, msgBody = {}):
        msgs = []

        fullMsgBody = json.dumps(msgBody)
        msgBodyLength = len(fullMsgBody)

        totalParts = 1 if msgBodyLength <= MaximumMsgSize else \
            int(math.ceil(msgBodyLength / MaximumMsgSize))

        startPoint = 0
        msgBodyPos = 0

        for partNo in range(totalParts):
            msgBodyPos = (partNo + 1) * MaximumMsgSize

            header = {'ID' : id, 'MsgParts' : totalParts,
                'MsgPart' : partNo }
            msg = (header, fullMsgBody[startPoint:msgBodyPos])
            jsonMsg = json.dumps(msg)       

            msgs.append(jsonMsg)
            startPoint = msgBodyPos

        return (msgs, '')

    def ProccessTransmitJobToNode(self, msg, connection):
        rootDir = '../documentation/configs/Wooster'

        exportedFiles = ['consoleLog.txt', 'blob.dat']
        params = {
            'Status' : 'buildStatus',
            'TaskID' : 'taskID',
            'Name' : 'taskName',
            'Exports' : len(exportedFiles),
            }
        msg, statusStr = self._BuildMessage(101, params)
        connection.sendLine(msg[0])

        for filename in exportedFiles:
            with open (filename, "rb") as exportFileHandle:
                data = exportFileHandle.read().encode('base64')

            params = {
                ExportFileToMaster_Tag.TaskID : taskID,
                ExportFileToMaster_Tag.FileContents : data,
                ExportFileToMaster_Tag.Filename : filename
            }
            msgs, _ = self._BuildMessage(MsgID.ExportFileToMaster, params)          
            for m in msgs: 
                connection.sendLine(m)

    def lineReceived(self, data):
        threads.deferToThread(self.ProcessIncomingMsg, data, self)


def ConnectFailed(reason):
    print 'Connection failed..'
    reactor.callLater(20, reactor.callFromThread, ConnectToServer)

def ConnectToServer():
    print 'Connecting...'
    from twisted.internet.endpoints import TCP4ClientEndpoint
    endpoint = TCP4ClientEndpoint(reactor, 'localhost', 8181)

    deferItem = endpoint.connect(factory)
    deferItem.addErrback(ConnectFailed)

netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False})
netThread.start()

reactor.callFromThread(ConnectToServer)

factory = ClientInterfaceFactory()
protocol = ClientInterfaceProtocol()

while 1:
    time.sleep(0.01)

    if connectionToServer == None: continue

    if trySend == True:
        protocol.ProccessTransmitJobToNode(None, None)
        trySend = False

我做错了什么吗?文件已发送,当写入是多部分或有多个文件时它会遇到困难。

如果发生单次写入,则 m 注意:我已经用一段粗略的示例代码更新了问题,希望它有意义。

_BuildMessage returns 一个二元组:(msgs, '').

您的网络代码对此进行迭代:

msgs = self._BuildMessage(MsgID.ExportFileToMaster, params)

for m in msgs: 

因此您的网络代码首先尝试发送 json 编码数据列表,然后尝试发送空字符串。它很可能引发异常,因为您无法使用 sendLine 发送任何内容的列表。如果您没有看到异常,则说明您忘记启用日志记录。您应该始终启用日志记录,以便您可以看到发生的任何异常。

此外,您正在使用 time.sleep,您不应该在基于 Twisted 的程序中这样做。如果您这样做是为了避免接收器过载,您应该使用 TCP 的本机背压,而不是注册一个可以接收暂停和恢复通知的生产者。无论如何,time.sleep(以及您对所有数据的循环)将阻塞整个反应器线程并阻止取得任何进展。结果是大部分数据在发送前都会在本地缓存。

此外,您的代码从非反应器线程调用 LineReceiver.sendLine。这有未定义的结果,但您可以指望它不起作用。

这个循环在主线程中运行:

while 1:
    time.sleep(0.01)

    if connectionToServer == None: continue

    if trySend == True:
        protocol.ProccessTransmitJobToNode(None, None)
        trySend = False

当反应器在另一个线程中运行时:

netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False})
netThread.start()

ProcessTransmitJobToNode 只需调用 self.sendLine:

def ProccessTransmitJobToNode(self, msg, connection):
    rootDir = '../documentation/configs/Wooster'

    exportedFiles = ['consoleLog.txt', 'blob.dat']
    params = {
        'Status' : 'buildStatus',
        'TaskID' : 'taskID',
        'Name' : 'taskName',
        'Exports' : len(exportedFiles),
        }
    msg, statusStr = self._BuildMessage(101, params)
    connection.sendLine(msg[0])

您或许应该从应用程序中完全删除线程的使用。使用 reactor.callLater 可以更好地管理基于时间的事件(您的主线程循环有效地每秒生成一次对 ProcessTransmitJobToNode 的调用(trySend 标志的模效应))。

您可能还想看看 https://github.com/twisted/tubes 作为使用 Twisted 管理大量数据的更好方法。