Python Twisted 通过网络发送大文件
Python Twisted sending large a file across network
我正在尝试使用带有 LineReceiver 协议的 Twisted 通过网络发送文件。我看到的问题是,当我读取一个二进制文件并尝试发送它们根本不发送的块时。
我正在使用以下方式读取文件:
import json
import time
import threading
from twisted.internet import reactor, threads
from twisted.protocols.basic import LineReceiver
from twisted.internet import protocol
MaximumMsgSize = 15500
trySend = True
connectionToServer = None
class ClientInterfaceFactory(protocol.Factory):
def buildProtocol(self, addr):
return WoosterInterfaceProtocol(self._msgProcessor, self._logger)
class ClientInterfaceProtocol(LineReceiver):
def connectionMade(self):
connectionToServer = self
def _DecodeMessage(self, rawMsg):
header, body = json.loads(rawMsg)
return (header, json.loads(body))
def ProcessIncomingMsg(self, rawMsg, connObject):
# Decode raw message.
decodedMsg = self._DecodeMessage(rawMsg)
self.ProccessTransmitJobToNode(decodedMsg, connObject)
def _BuildMessage(self, id, msgBody = {}):
msgs = []
fullMsgBody = json.dumps(msgBody)
msgBodyLength = len(fullMsgBody)
totalParts = 1 if msgBodyLength <= MaximumMsgSize else \
int(math.ceil(msgBodyLength / MaximumMsgSize))
startPoint = 0
msgBodyPos = 0
for partNo in range(totalParts):
msgBodyPos = (partNo + 1) * MaximumMsgSize
header = {'ID' : id, 'MsgParts' : totalParts,
'MsgPart' : partNo }
msg = (header, fullMsgBody[startPoint:msgBodyPos])
jsonMsg = json.dumps(msg)
msgs.append(jsonMsg)
startPoint = msgBodyPos
return (msgs, '')
def ProccessTransmitJobToNode(self, msg, connection):
rootDir = '../documentation/configs/Wooster'
exportedFiles = ['consoleLog.txt', 'blob.dat']
params = {
'Status' : 'buildStatus',
'TaskID' : 'taskID',
'Name' : 'taskName',
'Exports' : len(exportedFiles),
}
msg, statusStr = self._BuildMessage(101, params)
connection.sendLine(msg[0])
for filename in exportedFiles:
with open (filename, "rb") as exportFileHandle:
data = exportFileHandle.read().encode('base64')
params = {
ExportFileToMaster_Tag.TaskID : taskID,
ExportFileToMaster_Tag.FileContents : data,
ExportFileToMaster_Tag.Filename : filename
}
msgs, _ = self._BuildMessage(MsgID.ExportFileToMaster, params)
for m in msgs:
connection.sendLine(m)
def lineReceived(self, data):
threads.deferToThread(self.ProcessIncomingMsg, data, self)
def ConnectFailed(reason):
print 'Connection failed..'
reactor.callLater(20, reactor.callFromThread, ConnectToServer)
def ConnectToServer():
print 'Connecting...'
from twisted.internet.endpoints import TCP4ClientEndpoint
endpoint = TCP4ClientEndpoint(reactor, 'localhost', 8181)
deferItem = endpoint.connect(factory)
deferItem.addErrback(ConnectFailed)
netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False})
netThread.start()
reactor.callFromThread(ConnectToServer)
factory = ClientInterfaceFactory()
protocol = ClientInterfaceProtocol()
while 1:
time.sleep(0.01)
if connectionToServer == None: continue
if trySend == True:
protocol.ProccessTransmitJobToNode(None, None)
trySend = False
我做错了什么吗?文件已发送,当写入是多部分或有多个文件时它会遇到困难。
如果发生单次写入,则 m
注意:我已经用一段粗略的示例代码更新了问题,希望它有意义。
_BuildMessage
returns 一个二元组:(msgs, '')
.
您的网络代码对此进行迭代:
msgs = self._BuildMessage(MsgID.ExportFileToMaster, params)
for m in msgs:
因此您的网络代码首先尝试发送 json 编码数据列表,然后尝试发送空字符串。它很可能引发异常,因为您无法使用 sendLine
发送任何内容的列表。如果您没有看到异常,则说明您忘记启用日志记录。您应该始终启用日志记录,以便您可以看到发生的任何异常。
此外,您正在使用 time.sleep
,您不应该在基于 Twisted 的程序中这样做。如果您这样做是为了避免接收器过载,您应该使用 TCP 的本机背压,而不是注册一个可以接收暂停和恢复通知的生产者。无论如何,time.sleep
(以及您对所有数据的循环)将阻塞整个反应器线程并阻止取得任何进展。结果是大部分数据在发送前都会在本地缓存。
此外,您的代码从非反应器线程调用 LineReceiver.sendLine
。这有未定义的结果,但您可以指望它不起作用。
这个循环在主线程中运行:
while 1:
time.sleep(0.01)
if connectionToServer == None: continue
if trySend == True:
protocol.ProccessTransmitJobToNode(None, None)
trySend = False
当反应器在另一个线程中运行时:
netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False})
netThread.start()
ProcessTransmitJobToNode
只需调用 self.sendLine
:
def ProccessTransmitJobToNode(self, msg, connection):
rootDir = '../documentation/configs/Wooster'
exportedFiles = ['consoleLog.txt', 'blob.dat']
params = {
'Status' : 'buildStatus',
'TaskID' : 'taskID',
'Name' : 'taskName',
'Exports' : len(exportedFiles),
}
msg, statusStr = self._BuildMessage(101, params)
connection.sendLine(msg[0])
您或许应该从应用程序中完全删除线程的使用。使用 reactor.callLater
可以更好地管理基于时间的事件(您的主线程循环有效地每秒生成一次对 ProcessTransmitJobToNode
的调用(trySend
标志的模效应))。
您可能还想看看 https://github.com/twisted/tubes 作为使用 Twisted 管理大量数据的更好方法。
我正在尝试使用带有 LineReceiver 协议的 Twisted 通过网络发送文件。我看到的问题是,当我读取一个二进制文件并尝试发送它们根本不发送的块时。
我正在使用以下方式读取文件:
import json
import time
import threading
from twisted.internet import reactor, threads
from twisted.protocols.basic import LineReceiver
from twisted.internet import protocol
MaximumMsgSize = 15500
trySend = True
connectionToServer = None
class ClientInterfaceFactory(protocol.Factory):
def buildProtocol(self, addr):
return WoosterInterfaceProtocol(self._msgProcessor, self._logger)
class ClientInterfaceProtocol(LineReceiver):
def connectionMade(self):
connectionToServer = self
def _DecodeMessage(self, rawMsg):
header, body = json.loads(rawMsg)
return (header, json.loads(body))
def ProcessIncomingMsg(self, rawMsg, connObject):
# Decode raw message.
decodedMsg = self._DecodeMessage(rawMsg)
self.ProccessTransmitJobToNode(decodedMsg, connObject)
def _BuildMessage(self, id, msgBody = {}):
msgs = []
fullMsgBody = json.dumps(msgBody)
msgBodyLength = len(fullMsgBody)
totalParts = 1 if msgBodyLength <= MaximumMsgSize else \
int(math.ceil(msgBodyLength / MaximumMsgSize))
startPoint = 0
msgBodyPos = 0
for partNo in range(totalParts):
msgBodyPos = (partNo + 1) * MaximumMsgSize
header = {'ID' : id, 'MsgParts' : totalParts,
'MsgPart' : partNo }
msg = (header, fullMsgBody[startPoint:msgBodyPos])
jsonMsg = json.dumps(msg)
msgs.append(jsonMsg)
startPoint = msgBodyPos
return (msgs, '')
def ProccessTransmitJobToNode(self, msg, connection):
rootDir = '../documentation/configs/Wooster'
exportedFiles = ['consoleLog.txt', 'blob.dat']
params = {
'Status' : 'buildStatus',
'TaskID' : 'taskID',
'Name' : 'taskName',
'Exports' : len(exportedFiles),
}
msg, statusStr = self._BuildMessage(101, params)
connection.sendLine(msg[0])
for filename in exportedFiles:
with open (filename, "rb") as exportFileHandle:
data = exportFileHandle.read().encode('base64')
params = {
ExportFileToMaster_Tag.TaskID : taskID,
ExportFileToMaster_Tag.FileContents : data,
ExportFileToMaster_Tag.Filename : filename
}
msgs, _ = self._BuildMessage(MsgID.ExportFileToMaster, params)
for m in msgs:
connection.sendLine(m)
def lineReceived(self, data):
threads.deferToThread(self.ProcessIncomingMsg, data, self)
def ConnectFailed(reason):
print 'Connection failed..'
reactor.callLater(20, reactor.callFromThread, ConnectToServer)
def ConnectToServer():
print 'Connecting...'
from twisted.internet.endpoints import TCP4ClientEndpoint
endpoint = TCP4ClientEndpoint(reactor, 'localhost', 8181)
deferItem = endpoint.connect(factory)
deferItem.addErrback(ConnectFailed)
netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False})
netThread.start()
reactor.callFromThread(ConnectToServer)
factory = ClientInterfaceFactory()
protocol = ClientInterfaceProtocol()
while 1:
time.sleep(0.01)
if connectionToServer == None: continue
if trySend == True:
protocol.ProccessTransmitJobToNode(None, None)
trySend = False
我做错了什么吗?文件已发送,当写入是多部分或有多个文件时它会遇到困难。
如果发生单次写入,则 m 注意:我已经用一段粗略的示例代码更新了问题,希望它有意义。
_BuildMessage
returns 一个二元组:(msgs, '')
.
您的网络代码对此进行迭代:
msgs = self._BuildMessage(MsgID.ExportFileToMaster, params)
for m in msgs:
因此您的网络代码首先尝试发送 json 编码数据列表,然后尝试发送空字符串。它很可能引发异常,因为您无法使用 sendLine
发送任何内容的列表。如果您没有看到异常,则说明您忘记启用日志记录。您应该始终启用日志记录,以便您可以看到发生的任何异常。
此外,您正在使用 time.sleep
,您不应该在基于 Twisted 的程序中这样做。如果您这样做是为了避免接收器过载,您应该使用 TCP 的本机背压,而不是注册一个可以接收暂停和恢复通知的生产者。无论如何,time.sleep
(以及您对所有数据的循环)将阻塞整个反应器线程并阻止取得任何进展。结果是大部分数据在发送前都会在本地缓存。
此外,您的代码从非反应器线程调用 LineReceiver.sendLine
。这有未定义的结果,但您可以指望它不起作用。
这个循环在主线程中运行:
while 1:
time.sleep(0.01)
if connectionToServer == None: continue
if trySend == True:
protocol.ProccessTransmitJobToNode(None, None)
trySend = False
当反应器在另一个线程中运行时:
netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False})
netThread.start()
ProcessTransmitJobToNode
只需调用 self.sendLine
:
def ProccessTransmitJobToNode(self, msg, connection):
rootDir = '../documentation/configs/Wooster'
exportedFiles = ['consoleLog.txt', 'blob.dat']
params = {
'Status' : 'buildStatus',
'TaskID' : 'taskID',
'Name' : 'taskName',
'Exports' : len(exportedFiles),
}
msg, statusStr = self._BuildMessage(101, params)
connection.sendLine(msg[0])
您或许应该从应用程序中完全删除线程的使用。使用 reactor.callLater
可以更好地管理基于时间的事件(您的主线程循环有效地每秒生成一次对 ProcessTransmitJobToNode
的调用(trySend
标志的模效应))。
您可能还想看看 https://github.com/twisted/tubes 作为使用 Twisted 管理大量数据的更好方法。