结合 Python 的 concurrent.futures 中的 ThreadPoolExecutor 和 ProcessPoolExecutor
Combine ThreadPoolExecutor and ProcessPoolExecutor from Python's concurrent.futures
我必须下载许多压缩的 bz2 文件并解压缩它们以进行进一步处理。
下载是I/O绑定,解压是CPU绑定。所以我认为我最好将 ThreadPoolExecutor
与 ProcessPoolExecutor
结合起来。需要明确的是:我不想等到所有文件都下载完毕再解压缩。我宁愿在其他文件仍在下载时使用我的 CPU 资源。我读了 this thread,但它对我来说似乎没有用。
我有这个代码:
import bz2
import requests
from concurrent import futures
from io import BytesIO
class Source:
def __init__(self, url):
self.url = url
self.compressed = None
self.binary = None
def download(self):
print(f'Start downloading {self.url}')
req = requests.get(self.url, timeout=5)
self.compressed = req.content
print(f'Finished downloading {self.url}')
return self
def unzip(self):
print(f'Start unzipping {self.url}')
with bz2.open(BytesIO(self.compressed), 'rb') as file:
self.binary = file.read()
print(f'Finished unzipping {self.url}')
return self
list_sources_init = [Source(url) for url in list_urls]
with futures.ThreadPoolExecutor() as executor_threads, futures.ProcessPoolExecutor() as executor_processes:
list_futures_after_download = [
executor_threads.submit(source.download)
for source in list_sources_init
]
list_futures_after_unzip = []
for future in futures.as_completed(list_futures_after_download):
source = future.result()
list_futures_after_unzip.append(executor_processes.submit(source.unzip))
list_sources_unzipped = [future.result() for future in list_futures_after_unzip]
这行得通,但似乎有点可疑。此外,我想知道为什么 list_sources_init
中的元素没有被下载。首先,我打算只处理这个列表并对它的元素执行并行操作。现在我得到了 3 个列表,部分包含相同的数据。最痛苦的是,压缩数据存储在 list_futures_after_download
以及 list_futures_after_unzip
.
中
我想有更好的方法来做到这一点。但是怎么办?
这样更简洁。我认为为此使用线程没有任何好处:-
from multiprocessing import Pool, freeze_support
import requests
import bz2
from io import BytesIO
URL_LIST = []
def processURL(url):
try:
with requests.Session() as session:
r = session.get(url, timeout=5)
r.raise_for_status()
with bz2.open(BytesIO(r.content), 'rb') as data:
return data.read()
except Exception:
pass # will implicitly return None
def main():
with Pool() as pool:
results = pool.map(processURL, URL_LIST)
for r in results:
print(r)
if __name__ == '__main__':
freeze_support()
main()
我必须下载许多压缩的 bz2 文件并解压缩它们以进行进一步处理。
下载是I/O绑定,解压是CPU绑定。所以我认为我最好将 ThreadPoolExecutor
与 ProcessPoolExecutor
结合起来。需要明确的是:我不想等到所有文件都下载完毕再解压缩。我宁愿在其他文件仍在下载时使用我的 CPU 资源。我读了 this thread,但它对我来说似乎没有用。
我有这个代码:
import bz2
import requests
from concurrent import futures
from io import BytesIO
class Source:
def __init__(self, url):
self.url = url
self.compressed = None
self.binary = None
def download(self):
print(f'Start downloading {self.url}')
req = requests.get(self.url, timeout=5)
self.compressed = req.content
print(f'Finished downloading {self.url}')
return self
def unzip(self):
print(f'Start unzipping {self.url}')
with bz2.open(BytesIO(self.compressed), 'rb') as file:
self.binary = file.read()
print(f'Finished unzipping {self.url}')
return self
list_sources_init = [Source(url) for url in list_urls]
with futures.ThreadPoolExecutor() as executor_threads, futures.ProcessPoolExecutor() as executor_processes:
list_futures_after_download = [
executor_threads.submit(source.download)
for source in list_sources_init
]
list_futures_after_unzip = []
for future in futures.as_completed(list_futures_after_download):
source = future.result()
list_futures_after_unzip.append(executor_processes.submit(source.unzip))
list_sources_unzipped = [future.result() for future in list_futures_after_unzip]
这行得通,但似乎有点可疑。此外,我想知道为什么 list_sources_init
中的元素没有被下载。首先,我打算只处理这个列表并对它的元素执行并行操作。现在我得到了 3 个列表,部分包含相同的数据。最痛苦的是,压缩数据存储在 list_futures_after_download
以及 list_futures_after_unzip
.
我想有更好的方法来做到这一点。但是怎么办?
这样更简洁。我认为为此使用线程没有任何好处:-
from multiprocessing import Pool, freeze_support
import requests
import bz2
from io import BytesIO
URL_LIST = []
def processURL(url):
try:
with requests.Session() as session:
r = session.get(url, timeout=5)
r.raise_for_status()
with bz2.open(BytesIO(r.content), 'rb') as data:
return data.read()
except Exception:
pass # will implicitly return None
def main():
with Pool() as pool:
results = pool.map(processURL, URL_LIST)
for r in results:
print(r)
if __name__ == '__main__':
freeze_support()
main()