使用扭曲和 (tx) 请求异步下载文件

Asynchronous download of files with twisted and (tx)requests

我正在尝试从扭曲的应用程序中从 Internet 下载文件。由于它直接提供的其他功能或具有维护良好的库来提供(重试、代理、缓存控制等),我想使用请求来执行此操作。我对没有这些功能的扭曲的唯一解决方案持开放态度,但我似乎无法找到一个。

这些文件应该相当大,并且将在慢速连接下下载。因此,我使用请求的 stream=True 接口和响应的 iter_content。这个问题的末尾列出了一个或多或少完整的代码片段。这个的入口点是 http_download 函数,用 url 调用,一个 dst 来写入文件,还有一个 callback 和一个可选的 errback处理失败的下载。我已经删除了一些涉及准备目标(创建文件夹等)的代码和在反应器退出期间关闭会话的代码,但我认为它应该仍然可以按原样工作。

此代码有效。文件下载完毕,扭曲的反应堆继续运转。但是,这段代码似乎有问题:

def _stream_download(r, f):
    for chunk in r.iter_content(chunk_size=128):
        f.write(chunk)
        yield None

cooperative_dl = cooperate(_stream_download(response, filehandle))

因为iter_content return只有当它有块到return时,reactor处理一个块,运行其他代码,然后returns到等待下一个块,而不是让自己忙于更新 GUI 上的旋转等待动画(此处未实际发布代码)。

这是问题 -

import os
import re
from functools import partial
from six.moves.urllib.parse import urlparse

from requests import HTTPError
from twisted.internet.task import cooperate
from txrequests import Session

class HttpClientMixin(object):
    def __init__(self, *args, **kwargs):
        self._http_session = None

    def http_download(self, url, dst, callback, errback=None, **kwargs):
        dst = os.path.abspath(dst)
        # Log request
        deferred_response = self.http_session.get(url, stream=True, **kwargs)
        deferred_response.addCallback(self._http_check_response)
        deferred_response.addCallbacks(
            partial(self._http_download, destination=dst, callback=callback),
            partial(self._http_error_handler, url=url, errback=errback)
        )

    def _http_download(self, response, destination=None, callback=None):
        def _stream_download(r, f):
            for chunk in r.iter_content(chunk_size=128):
                f.write(chunk)
                yield None

        def _rollback(r, f, d):
            if r:
                r.close()
            if f:
                f.close()
            if os.path.exists(d):
                os.remove(d)

        filehandle = open(destination, 'wb')
        cooperative_dl = cooperate(_stream_download(response, filehandle))
        cooperative_dl.whenDone().addCallback(lambda _: response.close)
        cooperative_dl.whenDone().addCallback(lambda _: filehandle.close)
        cooperative_dl.whenDone().addCallback(
            partial(callback, url=response.url, destination=destination)
        )
        cooperative_dl.whenDone().addErrback(
            partial(_rollback, r=response, f=filehandle, d=destination)
        )

    def _http_error_handler(self, failure, url=None, errback=None):
        failure.trap(HTTPError)
        # Log error message
        if errback:
            errback(failure)

    @staticmethod
    def _http_check_response(response):
        response.raise_for_status()
        return response

    @property
    def http_session(self):
        if not self._http_session:
            # Log session start
            self._http_session = Session()
        return self._http_session

Is there a way to get twisted to operate on this generator in such a way that it yields control when the generator itself is not prepared to yield something?

没有。 Twisted 所能做的就是调用代码。如果代码无限期阻塞,则调用线程将无限期阻塞。这是Python运行时的基本前提。

Is there a way to get twisted to download files asynchronously using something full-featured like requests?

treq 文档中有 treq. You didn't say what "full-featured" means here but earlier you mentioned "retries", "proxies", and "cachecontrol". I don't believe treq currently has these features. You can find some kind of feature matrix(尽管我注意到它不包含您提到的任何功能 - 即使是请求)。我希望这些功能的实现会受到 treq 贡献的欢迎。

Is there a way to get twisted to download files asynchronously using something full-featured like requests?

运行 它在线程中 - 可能使用 Twisted 的线程池 API。

What would the basic approach be to such a problem with twisted, independent of the http features I want to use from requests.

曲目。