使用扭曲的 FTP 流式传输 HTTP 正文

Streaming HTTP body over FTP with twisted

我有一个自定义 FTP 服务器,它与 api 进行文件夹列表等通信,文件作为 url 从 api 返回。我正在尝试打开一个指向这些 url 的 http 流,并通过 ftp 客户端(以非阻塞方式)反馈数据,但我不知道如何连接它。

我试着举了一个最小的例子来更好地解释我的问题。在这个例子中,它在端口 2121 上启动了一个本地 FTP 服务器,它列出了本地文件系统,但是当下载文件时,它 returns www.yahoo.com 的内容主体而不是文件数据。

我尝试通过 io.BytesIO 对象缓冲数据,但没有数据通过。我想知道这是否是正确的方法,还是因为读取指针可能始终位于文件对象的末尾?

示例代码如下:

import io
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell, _FileReader
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess, FilePasswordDB
from twisted.internet import defer

agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])

class StreamWriter(Protocol):
    def __init__(self, finished, stream):
        self.finished = finished
        self.stream = stream

    def dataReceived(self, bytes):
        self.stream.write(bytes)

    def connectionLost(self, reason):
        print 'Finished receiving body:', reason.type, reason.value
        self.finished.callback(None)


def streamBody(response, stream):
    finished = Deferred()
    response.deliverBody(StreamWriter(finished, stream))
    return finished

def openForReading(self, path):
    d = agent.request("GET", "http://www.yahoo.com")

    stream = io.BytesIO()
    d.addCallback(lambda resp: streamBody(resp, stream))
    d.addErrback(log.err)

    return defer.succeed(_FileReader(stream))

def main():

    FTPAnonymousShell.openForReading = openForReading

    p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])

    f = FTPFactory(p)

    reactor.listenTCP(2121, f)
    reactor.run()

if __name__ == "__main__":
    main()

编辑

class FinishNotifier(ProtocolToConsumerAdapter, Protocol):
    def __init__(self, original):
        ProtocolToConsumerAdapter.__init__(self, original)
        self.finished = defer.Deferred()

    def connectionLost(self, reason=connectionDone):
        reason.trap(ConnectionDone, ResponseDone)
        self.finished.callback(None)

class HTTP2FTP(object):
    def __init__(self, response):
        self.response = response

    def send(self, consumer):
        protocol = FinishNotifier(consumer)
        self.response.deliverBody(protocol)
        return protocol.finished

def openForReading(self, path):
    d = agent.request("GET", "http://www.yahoo.com")

    d.addCallback(HTTP2FTP)
    d.addErrback(log.err)

    return d

更新的可运行示例:

from twisted.python import log
from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ConsumerToProtocolAdapter, connectionDone
from twisted.web.client import Agent, ContentDecoderAgent, GzipDecoder, HTTPConnectionPool, HTTPClientFactory
from twisted.protocols.ftp import FTPFactory, FTPRealm, FTPAnonymousShell
from twisted.cred.portal import Portal
from twisted.cred.checkers import AllowAnonymousAccess
from twisted.internet import defer
from twisted.internet.error import ConnectionDone
from twisted.web._newclient import ResponseDone

agent = ContentDecoderAgent(Agent(reactor), [('gzip', GzipDecoder)])
# pool= HTTPConnectionPool(reactor,persistent=True)
# pool.maxPersistentPerHost = 2
# agent= Agent(reactor,pool=pool,connectTimeout=5)

class FinishNotifier(ConsumerToProtocolAdapter):
    def __init__(self, original):
        ConsumerToProtocolAdapter.__init__(self, original)
        self.finished = defer.Deferred()

    def connectionLost(self, reason=connectionDone):
        reason.trap(ConnectionDone, ResponseDone)
        print "finished"
        self.finished.callback(None)

class HTTP2FTP(object):
    def __init__(self, response):
        self.response = response

    def send(self, consumer):
        print consumer
        protocol = FinishNotifier(consumer)
        self.response.deliverBody(protocol)
        return protocol.finished

def openForReading(self, path):
    d = agent.request("GET", "http://www.testtest.com")
    d.addCallback(HTTP2FTP)
    d.addErrback(log.err)

    return d

def main():

    FTPAnonymousShell.openForReading = openForReading

    p = Portal(FTPRealm('./'), [AllowAnonymousAccess()])

    f = FTPFactory(p)

    reactor.listenTCP(2121, f)
    reactor.run()

if __name__ == "__main__":
    main()

if it's because the read pointer is always at the end of the file object perhaps?

大概是这样。你有两件事同时发生。 HTTP 客户端正在写入 BytesIO 实例,而 FTP 客户端正在从中读取。 _FileReader(私有 API,Twisted 的 FTP 库的实现细节,而不是您实际应该使用的东西)用于从已经完成的文件中读取 - 而不是正在增长的文件 as 正在读取它。

幸运的是,无需通过不友好的异步 file 界面。查看 openForReading 应该 return 的类型 - IReadFile 提供商。 IReadFile 有一个方法 send,它接受一个提供 IConsumer.

的对象

另一方面,您有 deliverBody,它接受 IProtocol。此协议已将 HTTP 响应正文传递给它。这是您要提供给 IConsumer 传递给 IReadFile.send 的数据。

因此,不要试图让这两个部分与 BytesIO 一起工作,而是使用涉及的两个接口让它们一起工作:IProtocolIConsumer。这是草图(里面有很多错误,但总体形状是正确的):

from twisted.internet.protocol import ConsumerToProtocolAdapter
from twisted.internet.interfaces import IPushProducer
from twisted.protocols.ftp import IReadFile

class FinishNotifier(ConsumerToProtocolAdapter):
    def connectionLost(self, reason):
        reason.trap(ConnectionDone)
        self.finished.callback(None)

@implementer(IReadFile, IPushProducer)
class HTTP2FTP(object):
    def send(self, consumer):
        protocol = FinishNotifier(consumer)
        response.deliverBody(protocol)
        # Lazy hack.
        # This code probably belongs in protocol.connectionMade instead.
        self._producer = protocol.transport
        consumer.registerProducer(self._producer, streaming=True)
        protocol.finished.addCallback(
            lambda ignored: consumer.unregisterProducer()
        )
        return protocol.finished

    def pauseProducing(self):
        self._producer.pauseProducing()

    def resumeProducing(self):
        self._producer.resumeProducing()

    def stopProducing(self):
        self._producer.stopProducing()

请注意,通过在此处实施 IPushProducer,我们可以在 HTTP 和 FTP 连接之间进行流量控制(这样即使 HTTP 连接传输大量数据,服务器上的内存使用也会受到限制比 FTP 连接更快)。这是一件很酷的事情,很高兴它只需要几行额外的代码来实现。稍微不那么酷的是,您 必须 在正确的时间进行 unregisterProducer 调用。 FTP 协议实现使用它作为数据已完全传输的指示。这在 Twisted 中可能没有充分记录(这是一个应该纠正的疏忽)。