如何从 SFTPFile 重构 readChunk 以停止使用 inlineCallbacks?
How refactor readChunk from SFTPFile to stop using inlineCallbacks?
我正在尝试通过 ISFTPFile
读取文件,我想避免在这种情况下使用 @inlinceCallbacks
?
或者也许有更好的方法 read/write for ISFTPFile
?
@defer.inlineCallbacks
def calculate_checksum(open_file):
hasher = hashlib.sha256()
offset = 0
try:
while True:
d = yield open_file.readChunk(offset, chunk_size)
offset += chunk_size
hasher.update(d)
except EOFError:
pass
target_checksum = hasher.hexdigest()
defer.returnValue(target_checksum)
client_file = client.openFile(
filename=target, flags=FXF_READ, attrs={})
checksum = yield client_file.addCallback(calculate_checksum)
您实际上想要将 sha256.update 映射到文件块的迭代器上:
hasher = hashlib.sha256()
chunks = read_those_chunks()
map(hasher.update, chunks)
return hasher.hexdigest()
请注意,来自原始 calculate_checksums
的显式迭代(使用 while 循环)现在隐藏在 map
中。基本上,map
已经取代了迭代。
障碍是你想避免 read_those_chunks
将整个文件加载到内存中(大概)。因此,作为第一步,实施该部分:
def read_those_chunks(open_file, chunk_size):
offset = 0
while True:
yield open_file.readChunk(offset, chunk_size)
offset += chunk_size
有一个生成器可以生成 Deferred
s 并随后续块(或 EOFError
)一起触发。不幸的是,您不能将它与 map
一起使用。所以现在实现一个可以处理这个的类地图:
def async_map(function, iterable):
try:
d = next(iterable)
except StopIteration:
return
d.addCallback(function)
d.addCallback(lambda ignored: async_map(function, iterable))
return d
由于 async_map
将替换 map
并且 map
替换了原始实现中的迭代,async_map
仍然负责确保我们访问每个块可迭代的。但是,迭代(使用 for
或 while
)不能与 Deferred
很好地混合(混合它们通常是在您拉出 inlineCallbacks
时)。所以 async_map
不会迭代。它递归 - 迭代的常见替代方法。每个递归调用都对可迭代对象的下一个元素进行操作,直到没有更多元素为止(或直到 Deferred
失败,在本例中由于 EOFError
而发生)。
递归比使用 Deferred
的迭代更有效,因为递归对函数和函数调用进行操作。 Deferred
可以处理函数和函数调用 - 将函数传递给 addCallback
并且 Deferred
最终会调用该函数。迭代由函数的小片段组成(有时称为 "blocks" 或 "suites"),Deferred
无法处理这些片段。您不能将块传递给 addCallback
.
现在使用这两个来创建一个 Deferred
在计算摘要时触发:
def calculate_checksum(open_file, chunk_size):
hasher = hashlib.sha256()
chunks = read_those_chunks(open_file, chunk_size)
d = async_map(hasher.update, chunks)
d.addErrback(lambda err: err.trap(EOFError))
d.addCallback(lambda ignored: hasher.hexdigest())
return d
您可能还会注意到 async_map
与 map
的不同之处在于它不会生成函数调用的结果列表。也许它更像是 reduce
:
def async_reduce(function, iterable, lhs):
try:
d = next(iterable)
except StopIteration:
return lhs
d.addCallback(lambda rhs: function(lhs, rhs))
d.addCallback(lambda lhs: async_reduce(function, iterable, lhs))
return d
当然,它仍然是递归的,而不是迭代的。
计算 hexdigest 的缩减函数如下:
def update_hash(hasher, s):
hasher.update(s)
return hasher
因此 calculate_checksum
变为:
def calculate_checksum(open_file, chunk_size):
chunks = read_those_chunks(open_file, chunk_size)
d = async_reduce(update_hash, hashlib.sha256(), "")
d.addErrback(lambda err: err.trap(EOFError))
d.addCallback(lambda hasher: hasher.hexdigest())
return d
没有 hasher
闭包更好一些。
当然,还有许多其他方法可以重写此函数来避免inlineCallbacks
。我选择的方式并没有消除生成器函数的使用,所以如果这是你想要逃避的,它并没有真正的帮助。如果是这样,也许您可以像我在这里所做的那样将问题分解成不同的部分,none 其中涉及一个生成器。
我正在尝试通过 ISFTPFile
读取文件,我想避免在这种情况下使用 @inlinceCallbacks
?
或者也许有更好的方法 read/write for ISFTPFile
?
@defer.inlineCallbacks
def calculate_checksum(open_file):
hasher = hashlib.sha256()
offset = 0
try:
while True:
d = yield open_file.readChunk(offset, chunk_size)
offset += chunk_size
hasher.update(d)
except EOFError:
pass
target_checksum = hasher.hexdigest()
defer.returnValue(target_checksum)
client_file = client.openFile(
filename=target, flags=FXF_READ, attrs={})
checksum = yield client_file.addCallback(calculate_checksum)
您实际上想要将 sha256.update 映射到文件块的迭代器上:
hasher = hashlib.sha256()
chunks = read_those_chunks()
map(hasher.update, chunks)
return hasher.hexdigest()
请注意,来自原始 calculate_checksums
的显式迭代(使用 while 循环)现在隐藏在 map
中。基本上,map
已经取代了迭代。
障碍是你想避免 read_those_chunks
将整个文件加载到内存中(大概)。因此,作为第一步,实施该部分:
def read_those_chunks(open_file, chunk_size):
offset = 0
while True:
yield open_file.readChunk(offset, chunk_size)
offset += chunk_size
有一个生成器可以生成 Deferred
s 并随后续块(或 EOFError
)一起触发。不幸的是,您不能将它与 map
一起使用。所以现在实现一个可以处理这个的类地图:
def async_map(function, iterable):
try:
d = next(iterable)
except StopIteration:
return
d.addCallback(function)
d.addCallback(lambda ignored: async_map(function, iterable))
return d
由于 async_map
将替换 map
并且 map
替换了原始实现中的迭代,async_map
仍然负责确保我们访问每个块可迭代的。但是,迭代(使用 for
或 while
)不能与 Deferred
很好地混合(混合它们通常是在您拉出 inlineCallbacks
时)。所以 async_map
不会迭代。它递归 - 迭代的常见替代方法。每个递归调用都对可迭代对象的下一个元素进行操作,直到没有更多元素为止(或直到 Deferred
失败,在本例中由于 EOFError
而发生)。
递归比使用 Deferred
的迭代更有效,因为递归对函数和函数调用进行操作。 Deferred
可以处理函数和函数调用 - 将函数传递给 addCallback
并且 Deferred
最终会调用该函数。迭代由函数的小片段组成(有时称为 "blocks" 或 "suites"),Deferred
无法处理这些片段。您不能将块传递给 addCallback
.
现在使用这两个来创建一个 Deferred
在计算摘要时触发:
def calculate_checksum(open_file, chunk_size):
hasher = hashlib.sha256()
chunks = read_those_chunks(open_file, chunk_size)
d = async_map(hasher.update, chunks)
d.addErrback(lambda err: err.trap(EOFError))
d.addCallback(lambda ignored: hasher.hexdigest())
return d
您可能还会注意到 async_map
与 map
的不同之处在于它不会生成函数调用的结果列表。也许它更像是 reduce
:
def async_reduce(function, iterable, lhs):
try:
d = next(iterable)
except StopIteration:
return lhs
d.addCallback(lambda rhs: function(lhs, rhs))
d.addCallback(lambda lhs: async_reduce(function, iterable, lhs))
return d
当然,它仍然是递归的,而不是迭代的。
计算 hexdigest 的缩减函数如下:
def update_hash(hasher, s):
hasher.update(s)
return hasher
因此 calculate_checksum
变为:
def calculate_checksum(open_file, chunk_size):
chunks = read_those_chunks(open_file, chunk_size)
d = async_reduce(update_hash, hashlib.sha256(), "")
d.addErrback(lambda err: err.trap(EOFError))
d.addCallback(lambda hasher: hasher.hexdigest())
return d
没有 hasher
闭包更好一些。
当然,还有许多其他方法可以重写此函数来避免inlineCallbacks
。我选择的方式并没有消除生成器函数的使用,所以如果这是你想要逃避的,它并没有真正的帮助。如果是这样,也许您可以像我在这里所做的那样将问题分解成不同的部分,none 其中涉及一个生成器。