多处理进度条
Progress bar with multiprocessing
我使用 multiprocessing 包来 运行 函数:run_performance
,它加载 zip 文件,其中包含几个 csv 文件。
我搜索以正确显示每个 zip 文件中的 csv 数量的进度条。
使用我的代码,显示为 incoherent/wrong:
我的代码:
from alive_progress import alive_bar
from zipfile import ZipFile
import os
def get_filepaths(directory):
file_paths = [] # List which will store all of the full filepaths.
# Walk the tree.
for root, directories, files in os.walk(directory):
for filename in files:
# Join the two strings in order to form the full filepath.
filepath = os.path.join(root, filename)
file_paths.append(filepath) # Add it to the list.
return file_paths # Self-explanatory.
def count_files_7z(myarchive):
cnt_files = []
with closing(ZipFile(myarchive)) as archive:
for csv in archive.namelist():
cnt_files.append(csv)
return cnt_files
def run_performance(zipobj):
zf = zipfile.ZipFile(zipobj)
cnt = count_files_7z(zipobj)
with alive_bar(len(cnt)) as bar:
for f in zf.namelist():
bar()
with zf.open(f) as myfile:
print(myfile) # and done other things
list_dir = "path_of_zipfiles" #
for idx1, folder in enumerate(list_dir):
get_all_zips = get_filepaths(folder)
for idx2, zip_file in enumerate(get_all_zips):
with zipfile.ZipFile(zip_file) as zipobj:
p = Process(target=run_performance,args=(zipobj.filename,))
p.start()
p.join()
我的显示:
|████▌ | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s)|████▌ | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s)|████▌ | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s
...
如果我将行 p.join()
放置为与 p.start()
相同的缩进,显示是正确的,但多处理不再工作。
所以脚本太耗时了:
1m18s 与 0m14s
期望的输出:
|████████████████████████████████████████| 1/1 [100%] in 2.4s (0.41/s)
|████████████████████████████████████████| 2/2 [100%] in 4.7s (0.43/s)
|████████████████████ | ▄▂▂ 1/2 [50%] in 2s (0.6/s, eta: 0s)
似乎 alive_bar
记得调用时光标的位置,并从该点开始绘制条形图。当您启动多个进程时,每个进程都不知道另一个进程,输出会变得混乱。
确实,github 中有一个关于此的未解决问题(参见 here)。有一些使用多线程的 hacky 解决方案,但我认为使用多处理解决它并不容易,除非你在进程间通信上实现某种会减慢速度的东西。
首先是关于您的代码的一些一般性评论。在您的主要进程中,您使用文件路径打开 zip 存档只是为了取回原始文件名。这真的没有太大意义。然后在 count_files_7z
中迭代 zf.namelist()
中的 return 值以在 zf.namelist()
已经是这些文件的列表时构建存档中的文件列表。这也没有多大意义。您还使用上下文管理器功能 closing
来确保存档在块的末尾关闭,但是 with
块本身是一个上下文管理器,具有相同的目的。
我试过安装alive-progress,进度条乱七八糟。这是一项更适合多线程而不是多处理的任务。实际上,它可能更适合串行处理,因为对磁盘进行并发 I/O 操作,除非它是固态驱动器,否则可能会损害性能。如果您读取的文件涉及繁重的 CPU-intensive 处理,您将获得性能提升。如果是这种情况,我已将一个多处理池传递给每个线程,您可以在其中执行对 apply
的调用,并指定已放置 CPU-intensive 代码的函数。但是在多线程而不是多处理下完成时,进度条应该会更好地工作。即使那样,我也无法使用 alive-progress 获得任何体面的显示,诚然我没有花太多时间。所以我转而使用更常见的 tqdm 模块,可从 PyPi 存储库获得。
即使使用 tqdm 也有一个问题,当进度条达到 100% 时,tqdm 一定是在写东西(一个换行符?)重新定位其他进度条。因此,我所做的是指定 leave=False,这会导致条形图在达到 100% 时消失。但至少你可以看到所有的进度条在进行时没有失真。
from multiprocessing.pool import Pool, ThreadPool
from threading import Lock
import tqdm
from zipfile import ZipFile
import os
import heapq
def get_filepaths(directory):
file_paths = [] # List which will store all of the full filepaths.
# Walk the tree.
for root, directories, files in os.walk(directory):
for filename in files:
# Join the two strings in order to form the full filepath.
filepath = os.path.join(root, filename)
file_paths.append(filepath) # Add it to the list.
return file_paths # Self-explanatory.
def get_free_position():
""" Return the minimum possible position """
with lock:
free_position = heapq.heappop(free_positions)
return free_position
def return_free_position(position):
with lock:
heapq.heappush(free_positions, position)
def run_performance(zip_file):
position = get_free_position()
with ZipFile(zip_file) as zf:
file_list = zf.namelist()
with tqdm.tqdm(total=len(file_list), position=position, leave=False) as bar:
for f in file_list:
with zf.open(f) as myfile:
... # do things with myfile (perhaps myfile.read())
# for CPU-intensive tasks: result = pool.apply(some_function, args=(arg1, arg2, ... argn))
import time
time.sleep(.005) # simulate doing something
bar.update()
return_free_position(position)
def generate_zip_files():
list_dir = ['path1', 'path2']
for folder in list_dir:
get_all_zips = get_filepaths(folder)
for zip_file in get_all_zips:
yield zip_file
# Required for Windows:
if __name__ == '__main__':
N_THREADS = 5
free_positions = list(range(N_THREADS)) # already a heap
lock = Lock()
pool = Pool()
thread_pool = ThreadPool(N_THREADS)
for result in thread_pool.imap_unordered(run_performance, generate_zip_files()):
pass
pool.close()
pool.join()
thread_pool.close()
thread_pool.join()
上面的代码使用了一个任意大小限制为 5 的多处理线程池作为演示。您可以将 N_THREADS
增加或减少到您想要的任何值,但正如我所说,它可能会或可能不会帮助提高性能。如果您希望每个 zip 文件一个线程,则:
if __name__ == '__main__':
zip_files = list(generate_zip_files())
N_THREADS = len(zip_files)
free_positions = list(range(N_THREADS)) # already a heap
lock = Lock()
pool = Pool()
thread_pool = ThreadPool(N_THREADS)
for result in thread_pool.imap_unordered(run_performance, zip_files):
pass
pool.close()
pool.join()
thread_pool.close()
thread_pool.join()
在 Enlighten codebase there is an example 类似的东西。您只需将 process_files()
函数替换为您自己的函数即可。
在这里重新创建有点大,但想法是您实际上应该只在主进程中进行控制台输出,并使用某种形式的 IPC 来中继来自子进程的信息。 Enlighten 示例使用 IPC 队列,这是非常合理的,因为它只发送当前计数。
我使用 multiprocessing 包来 运行 函数:run_performance
,它加载 zip 文件,其中包含几个 csv 文件。
我搜索以正确显示每个 zip 文件中的 csv 数量的进度条。 使用我的代码,显示为 incoherent/wrong:
我的代码:
from alive_progress import alive_bar
from zipfile import ZipFile
import os
def get_filepaths(directory):
file_paths = [] # List which will store all of the full filepaths.
# Walk the tree.
for root, directories, files in os.walk(directory):
for filename in files:
# Join the two strings in order to form the full filepath.
filepath = os.path.join(root, filename)
file_paths.append(filepath) # Add it to the list.
return file_paths # Self-explanatory.
def count_files_7z(myarchive):
cnt_files = []
with closing(ZipFile(myarchive)) as archive:
for csv in archive.namelist():
cnt_files.append(csv)
return cnt_files
def run_performance(zipobj):
zf = zipfile.ZipFile(zipobj)
cnt = count_files_7z(zipobj)
with alive_bar(len(cnt)) as bar:
for f in zf.namelist():
bar()
with zf.open(f) as myfile:
print(myfile) # and done other things
list_dir = "path_of_zipfiles" #
for idx1, folder in enumerate(list_dir):
get_all_zips = get_filepaths(folder)
for idx2, zip_file in enumerate(get_all_zips):
with zipfile.ZipFile(zip_file) as zipobj:
p = Process(target=run_performance,args=(zipobj.filename,))
p.start()
p.join()
我的显示:
|████▌ | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s)|████▌ | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s)|████▌ | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s
...
如果我将行 p.join()
放置为与 p.start()
相同的缩进,显示是正确的,但多处理不再工作。
所以脚本太耗时了:
1m18s 与 0m14s
期望的输出:
|████████████████████████████████████████| 1/1 [100%] in 2.4s (0.41/s)
|████████████████████████████████████████| 2/2 [100%] in 4.7s (0.43/s)
|████████████████████ | ▄▂▂ 1/2 [50%] in 2s (0.6/s, eta: 0s)
似乎 alive_bar
记得调用时光标的位置,并从该点开始绘制条形图。当您启动多个进程时,每个进程都不知道另一个进程,输出会变得混乱。
确实,github 中有一个关于此的未解决问题(参见 here)。有一些使用多线程的 hacky 解决方案,但我认为使用多处理解决它并不容易,除非你在进程间通信上实现某种会减慢速度的东西。
首先是关于您的代码的一些一般性评论。在您的主要进程中,您使用文件路径打开 zip 存档只是为了取回原始文件名。这真的没有太大意义。然后在 count_files_7z
中迭代 zf.namelist()
中的 return 值以在 zf.namelist()
已经是这些文件的列表时构建存档中的文件列表。这也没有多大意义。您还使用上下文管理器功能 closing
来确保存档在块的末尾关闭,但是 with
块本身是一个上下文管理器,具有相同的目的。
我试过安装alive-progress,进度条乱七八糟。这是一项更适合多线程而不是多处理的任务。实际上,它可能更适合串行处理,因为对磁盘进行并发 I/O 操作,除非它是固态驱动器,否则可能会损害性能。如果您读取的文件涉及繁重的 CPU-intensive 处理,您将获得性能提升。如果是这种情况,我已将一个多处理池传递给每个线程,您可以在其中执行对 apply
的调用,并指定已放置 CPU-intensive 代码的函数。但是在多线程而不是多处理下完成时,进度条应该会更好地工作。即使那样,我也无法使用 alive-progress 获得任何体面的显示,诚然我没有花太多时间。所以我转而使用更常见的 tqdm 模块,可从 PyPi 存储库获得。
即使使用 tqdm 也有一个问题,当进度条达到 100% 时,tqdm 一定是在写东西(一个换行符?)重新定位其他进度条。因此,我所做的是指定 leave=False,这会导致条形图在达到 100% 时消失。但至少你可以看到所有的进度条在进行时没有失真。
from multiprocessing.pool import Pool, ThreadPool
from threading import Lock
import tqdm
from zipfile import ZipFile
import os
import heapq
def get_filepaths(directory):
file_paths = [] # List which will store all of the full filepaths.
# Walk the tree.
for root, directories, files in os.walk(directory):
for filename in files:
# Join the two strings in order to form the full filepath.
filepath = os.path.join(root, filename)
file_paths.append(filepath) # Add it to the list.
return file_paths # Self-explanatory.
def get_free_position():
""" Return the minimum possible position """
with lock:
free_position = heapq.heappop(free_positions)
return free_position
def return_free_position(position):
with lock:
heapq.heappush(free_positions, position)
def run_performance(zip_file):
position = get_free_position()
with ZipFile(zip_file) as zf:
file_list = zf.namelist()
with tqdm.tqdm(total=len(file_list), position=position, leave=False) as bar:
for f in file_list:
with zf.open(f) as myfile:
... # do things with myfile (perhaps myfile.read())
# for CPU-intensive tasks: result = pool.apply(some_function, args=(arg1, arg2, ... argn))
import time
time.sleep(.005) # simulate doing something
bar.update()
return_free_position(position)
def generate_zip_files():
list_dir = ['path1', 'path2']
for folder in list_dir:
get_all_zips = get_filepaths(folder)
for zip_file in get_all_zips:
yield zip_file
# Required for Windows:
if __name__ == '__main__':
N_THREADS = 5
free_positions = list(range(N_THREADS)) # already a heap
lock = Lock()
pool = Pool()
thread_pool = ThreadPool(N_THREADS)
for result in thread_pool.imap_unordered(run_performance, generate_zip_files()):
pass
pool.close()
pool.join()
thread_pool.close()
thread_pool.join()
上面的代码使用了一个任意大小限制为 5 的多处理线程池作为演示。您可以将 N_THREADS
增加或减少到您想要的任何值,但正如我所说,它可能会或可能不会帮助提高性能。如果您希望每个 zip 文件一个线程,则:
if __name__ == '__main__':
zip_files = list(generate_zip_files())
N_THREADS = len(zip_files)
free_positions = list(range(N_THREADS)) # already a heap
lock = Lock()
pool = Pool()
thread_pool = ThreadPool(N_THREADS)
for result in thread_pool.imap_unordered(run_performance, zip_files):
pass
pool.close()
pool.join()
thread_pool.close()
thread_pool.join()
在 Enlighten codebase there is an example 类似的东西。您只需将 process_files()
函数替换为您自己的函数即可。
在这里重新创建有点大,但想法是您实际上应该只在主进程中进行控制台输出,并使用某种形式的 IPC 来中继来自子进程的信息。 Enlighten 示例使用 IPC 队列,这是非常合理的,因为它只发送当前计数。