python 中的大文件并发下载和处理
Concurrent download and processing of large files in python
我有一个大文件的 URL 列表下载(例如压缩档案),我想处理(例如解压档案)。
下载和处理都需要很长时间,而且磁盘 IO 处理很繁重,所以我想 一次 运行 每个。由于这两个任务花费的时间大致相同并且不会竞争相同的资源,因此我想在处理最后一个文件时下载下一个文件。
这是 producer-consumer problem 的变体。
情况与reading and processing images or downloading loads of files类似,但我的下载程序调用(还)不是可挑选的,所以我无法使用多处理,并且两个任务花费的时间大致相同。
这是一个虚拟示例,其中下载和处理都是阻塞的:
import time
import posixpath
def download(urls):
for url in urls:
time.sleep(3) # this is the download (more like 1000s)
yield posixpath.basename(url)
def process(fname):
time.sleep(2) # this is the processing part (more like 600s)
urls = ['a', 'b', 'c']
for fname in download(urls):
process(fname)
print(fname)
如何使这两个任务并发?我可以将 yield
或 yield from
in a smart way, perhaps in combination with deque
? Or must it be asyncio
与 Future
一起使用吗?
我会简单地使用 threading.Thread(target=process, args=(fname,))
并启动一个新线程进行处理。
但在此之前,结束最后一个处理线程:
t = None
for fname in download(urls):
if t is not None: # wait for last processing thread to end
t.join()
t = threading.Thread(target=process, args=(fname,))
t.start()
print('[i] thread started for %s' % fname)
我有一个大文件的 URL 列表下载(例如压缩档案),我想处理(例如解压档案)。
下载和处理都需要很长时间,而且磁盘 IO 处理很繁重,所以我想 一次 运行 每个。由于这两个任务花费的时间大致相同并且不会竞争相同的资源,因此我想在处理最后一个文件时下载下一个文件。
这是 producer-consumer problem 的变体。
情况与reading and processing images or downloading loads of files类似,但我的下载程序调用(还)不是可挑选的,所以我无法使用多处理,并且两个任务花费的时间大致相同。
这是一个虚拟示例,其中下载和处理都是阻塞的:
import time
import posixpath
def download(urls):
for url in urls:
time.sleep(3) # this is the download (more like 1000s)
yield posixpath.basename(url)
def process(fname):
time.sleep(2) # this is the processing part (more like 600s)
urls = ['a', 'b', 'c']
for fname in download(urls):
process(fname)
print(fname)
如何使这两个任务并发?我可以将 yield
或 yield from
in a smart way, perhaps in combination with deque
? Or must it be asyncio
与 Future
一起使用吗?
我会简单地使用 threading.Thread(target=process, args=(fname,))
并启动一个新线程进行处理。
但在此之前,结束最后一个处理线程:
t = None
for fname in download(urls):
if t is not None: # wait for last processing thread to end
t.join()
t = threading.Thread(target=process, args=(fname,))
t.start()
print('[i] thread started for %s' % fname)