如何从 SFTPFile 重构 readChunk 以停止使用 inlineCallbacks?

How refactor readChunk from SFTPFile to stop using inlineCallbacks?

我正在尝试通过 ISFTPFile 读取文件,我想避免在这种情况下使用 @inlinceCallbacks

或者也许有更好的方法 read/write for ISFTPFile

@defer.inlineCallbacks
def calculate_checksum(open_file):
    hasher = hashlib.sha256()

    offset = 0
    try:
        while True:
            d = yield open_file.readChunk(offset, chunk_size)
            offset += chunk_size
            hasher.update(d)

    except EOFError:
        pass

    target_checksum = hasher.hexdigest()
    defer.returnValue(target_checksum)


client_file = client.openFile(
    filename=target, flags=FXF_READ, attrs={})
checksum = yield client_file.addCallback(calculate_checksum)

您实际上想要将 sha256.update 映射到文件块的迭代器上:

hasher = hashlib.sha256()
chunks = read_those_chunks()
map(hasher.update, chunks)
return hasher.hexdigest()

请注意,来自原始 calculate_checksums 的显式迭代(使用 while 循环)现在隐藏在 map 中。基本上,map 已经取代了迭代。

障碍是你想避免 read_those_chunks 将整个文件加载到内存中(大概)。因此,作为第一步,实施该部分:

def read_those_chunks(open_file, chunk_size):
    offset = 0
    while True:
        yield open_file.readChunk(offset, chunk_size)
        offset += chunk_size

有一个生成器可以生成 Deferreds 并随后续块(或 EOFError)一起触发。不幸的是,您不能将它与 map 一起使用。所以现在实现一个可以处理这个的类地图:

def async_map(function, iterable):
    try:
        d = next(iterable)
    except StopIteration:
        return

    d.addCallback(function)
    d.addCallback(lambda ignored: async_map(function, iterable))
    return d

由于 async_map 将替换 map 并且 map 替换了原始实现中的迭代,async_map 仍然负责确保我们访问每个块可迭代的。但是,迭代(使用 forwhile)不能与 Deferred 很好地混合(混合它们通常是在您拉出 inlineCallbacks 时)。所以 async_map 不会迭代。它递归 - 迭代的常见替代方法。每个递归调用都对可迭代对象的下一个元素进行操作,直到没有更多元素为止(或直到​​ Deferred 失败,在本例中由于 EOFError 而发生)。

递归比使用 Deferred 的迭代更有效,因为递归对函数和函数调用进行操作。 Deferred 可以处理函数和函数调用 - 将函数传递给 addCallback 并且 Deferred 最终会调用该函数。迭代由函数的小片段组成(有时称为 "blocks" 或 "suites"),Deferred 无法处理这些片段。您不能将块传递给 addCallback.

现在使用这两个来创建一个 Deferred 在计算摘要时触发:

def calculate_checksum(open_file, chunk_size):
    hasher = hashlib.sha256()
    chunks = read_those_chunks(open_file, chunk_size)
    d = async_map(hasher.update, chunks)
    d.addErrback(lambda err: err.trap(EOFError))
    d.addCallback(lambda ignored: hasher.hexdigest())
    return d

您可能还会注意到 async_mapmap 的不同之处在于它不会生成函数调用的结果列表。也许它更像是 reduce:

def async_reduce(function, iterable, lhs):
    try:
        d = next(iterable)
    except StopIteration:
        return lhs

    d.addCallback(lambda rhs: function(lhs, rhs))
    d.addCallback(lambda lhs: async_reduce(function, iterable, lhs))
    return d

当然,它仍然是递归的,而不是迭代的。

计算 hexdigest 的缩减函数如下:

def update_hash(hasher, s):
    hasher.update(s)
    return hasher

因此 calculate_checksum 变为:

def calculate_checksum(open_file, chunk_size):
    chunks = read_those_chunks(open_file, chunk_size)
    d = async_reduce(update_hash, hashlib.sha256(), "")
    d.addErrback(lambda err: err.trap(EOFError))
    d.addCallback(lambda hasher: hasher.hexdigest())
    return d

没有 hasher 闭包更好一些。

当然,还有许多其他方法可以重写此函数来避免inlineCallbacks。我选择的方式并没有消除生成器函数的使用,所以如果这是你想要逃避的,它并没有真正的帮助。如果是这样,也许您可​​以像我在这里所做的那样将问题分解成不同的部分,none 其中涉及一个生成器。