在 Twistd 中无阻塞地压缩文件。
Zipping files in Twistd without blocking.
有没有办法在 twisted 中不阻塞地压缩文件?
import zipfile
from twisted.internet import defer
from twisted.internet import reactor
def zip_files(file_list, path, output_Zip):
zip_handle = zipfile.ZipFile(output_zip, mode='w', allowZip64=True)
try:
for i in file_list:
zip_handle.write(i)
zip_handle.close()
return True
except Exception as e:
return False
def print_zip(res):
print res
return res
file_list = ['path_to_file1','path_to_file2']
output_path = 'full_path_to_output_zip'
d = defer.Deferred()
d.addCallback(lambda _: zip_files(file_list, output_path)
d.addCallback(print_zip)
zip_result = d
reactor.run()
到目前为止我已经有了这个。虽然它确实有效,但触发压缩过程会导致扭曲阻塞并等待初始 'zip job' 完成。我宁愿它终止现有的 'zip job' 并启动新的。
也许是这样的,使用 DeferredList
的 deferToThread
来不阻塞写入 zip 文件:
import zipfile
import logging
from twisted.internet import threads, defer
from twisted.internet import reactor
log = logging.getLogger()
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
def zip_file(input_path, output_path):
with zipfile.ZipFile(output_path, mode='w', allowZip64=True) as zip_handle:
zip_handle.write(input_path)
def log_failure(err):
log.exception("error: %s", err)
def zip_file_and_catch_error(input_path, output_path):
d = threads.deferToThread(zip_file, input_path, output_path)
d.addErrback(log_failure)
return d
def main():
input_paths = ['path_to_file1','path_to_file2']
output_paths = ['path_out1','path_out2']
assert len(input_paths) == len(output_paths)
dl = defer.DeferredList([zip_file_and_catch_error(input_path, output_path)
for input_path, output_path in zip(input_paths, output_paths)])
dl.addCallback(lambda result: log.info("result: %s", result))
dl.addBoth(lambda _: reactor.callLater(0, reactor.stop))
reactor.run()
if __name__ == "__main__":
main()
import zipfile
from twisted.internet import defer, reactor
def main():
file_list = ['path_to_file1','path_to_file2']
output_path = 'full_path_to_output.zip'
zip_obj = zipfile.ZipFile(output_path, mode='w', allowZip64=True)
d = zip_files(zip_obj, file_list)
d.addCallback(handle_success)
d.addErrback(handle_error)
d.addBoth(close_zip_obj, zip_obj = zip_obj)
@defer.inlineCallbacks
def zip_files(zip_obj, file_list):
for item in file_list:
yield zip_obj.write(item)
# handle "interrupts" here
def handle_success(ignore):
print('Done zipping')
def handle_error(failure):
print('Error: {0}'.format(failure.value))
def close_zip_obj(ignore, zip_obj):
print('Closing zip object')
zip_obj.close()
main()
reactor.run()
我试图让我的示例非常简单,这样 Twisted 的新手就不会感到困惑。 ZipFile
对象在外部创建并传递给 zip_files()
(现在用 @inlineCallbacks
和 returns 和 'Deferred' 装饰)这样可以很容易地访问它,如果必要的。处理成功和错误回调(通过 addCallback/addErrback
)更新这些函数以满足您的需要。最后,从 main 函数传递到 close_zip_obj()
的 ZipFile
对象在压缩完成后关闭。这应该可以相当快地处理大量中等大小的文件。对于大文件,您 "should" 可以使用原始代码在 deferToThread
中执行任务。
但是,您发表的评论非常含糊:
Id rather it terminate the existing 'zip job' and start the new one.
这假设如果您正在 zip 中,您想停止当前的 zip 并开始另一个 zip。使用 deferToThread
或任何线程方法,在线程之间传递标志、set/unset 锁以及与其他线程同步变得乏味。如果您决定使用线程,请记住这一点。
有没有办法在 twisted 中不阻塞地压缩文件?
import zipfile
from twisted.internet import defer
from twisted.internet import reactor
def zip_files(file_list, path, output_Zip):
zip_handle = zipfile.ZipFile(output_zip, mode='w', allowZip64=True)
try:
for i in file_list:
zip_handle.write(i)
zip_handle.close()
return True
except Exception as e:
return False
def print_zip(res):
print res
return res
file_list = ['path_to_file1','path_to_file2']
output_path = 'full_path_to_output_zip'
d = defer.Deferred()
d.addCallback(lambda _: zip_files(file_list, output_path)
d.addCallback(print_zip)
zip_result = d
reactor.run()
到目前为止我已经有了这个。虽然它确实有效,但触发压缩过程会导致扭曲阻塞并等待初始 'zip job' 完成。我宁愿它终止现有的 'zip job' 并启动新的。
也许是这样的,使用 DeferredList
的 deferToThread
来不阻塞写入 zip 文件:
import zipfile
import logging
from twisted.internet import threads, defer
from twisted.internet import reactor
log = logging.getLogger()
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
def zip_file(input_path, output_path):
with zipfile.ZipFile(output_path, mode='w', allowZip64=True) as zip_handle:
zip_handle.write(input_path)
def log_failure(err):
log.exception("error: %s", err)
def zip_file_and_catch_error(input_path, output_path):
d = threads.deferToThread(zip_file, input_path, output_path)
d.addErrback(log_failure)
return d
def main():
input_paths = ['path_to_file1','path_to_file2']
output_paths = ['path_out1','path_out2']
assert len(input_paths) == len(output_paths)
dl = defer.DeferredList([zip_file_and_catch_error(input_path, output_path)
for input_path, output_path in zip(input_paths, output_paths)])
dl.addCallback(lambda result: log.info("result: %s", result))
dl.addBoth(lambda _: reactor.callLater(0, reactor.stop))
reactor.run()
if __name__ == "__main__":
main()
import zipfile
from twisted.internet import defer, reactor
def main():
file_list = ['path_to_file1','path_to_file2']
output_path = 'full_path_to_output.zip'
zip_obj = zipfile.ZipFile(output_path, mode='w', allowZip64=True)
d = zip_files(zip_obj, file_list)
d.addCallback(handle_success)
d.addErrback(handle_error)
d.addBoth(close_zip_obj, zip_obj = zip_obj)
@defer.inlineCallbacks
def zip_files(zip_obj, file_list):
for item in file_list:
yield zip_obj.write(item)
# handle "interrupts" here
def handle_success(ignore):
print('Done zipping')
def handle_error(failure):
print('Error: {0}'.format(failure.value))
def close_zip_obj(ignore, zip_obj):
print('Closing zip object')
zip_obj.close()
main()
reactor.run()
我试图让我的示例非常简单,这样 Twisted 的新手就不会感到困惑。 ZipFile
对象在外部创建并传递给 zip_files()
(现在用 @inlineCallbacks
和 returns 和 'Deferred' 装饰)这样可以很容易地访问它,如果必要的。处理成功和错误回调(通过 addCallback/addErrback
)更新这些函数以满足您的需要。最后,从 main 函数传递到 close_zip_obj()
的 ZipFile
对象在压缩完成后关闭。这应该可以相当快地处理大量中等大小的文件。对于大文件,您 "should" 可以使用原始代码在 deferToThread
中执行任务。
但是,您发表的评论非常含糊:
Id rather it terminate the existing 'zip job' and start the new one.
这假设如果您正在 zip 中,您想停止当前的 zip 并开始另一个 zip。使用 deferToThread
或任何线程方法,在线程之间传递标志、set/unset 锁以及与其他线程同步变得乏味。如果您决定使用线程,请记住这一点。