多处理进度条

Progress bar with multiprocessing

我使用 multiprocessing 包来 运行 函数:run_performance,它加载 zip 文件,其中包含几个 csv 文件。

我搜索以正确显示每个 zip 文件中的 csv 数量的进度条。 使用我的代码,显示为 incoherent/wrong:

我的代码:

from alive_progress import alive_bar
from zipfile import ZipFile
import os

def get_filepaths(directory):
    file_paths = []  # List which will store all of the full filepaths.
    # Walk the tree.
    for root, directories, files in os.walk(directory):
        for filename in files:
            # Join the two strings in order to form the full filepath.
            filepath = os.path.join(root, filename)
            file_paths.append(filepath)  # Add it to the list.
    return file_paths  # Self-explanatory.

def count_files_7z(myarchive):
   cnt_files = []
   with closing(ZipFile(myarchive)) as archive:
      for csv in archive.namelist():
         cnt_files.append(csv)
      return cnt_files

def run_performance(zipobj):
   zf = zipfile.ZipFile(zipobj)
   cnt = count_files_7z(zipobj)
   with alive_bar(len(cnt)) as bar:
      for f in zf.namelist():
         bar()
         with zf.open(f) as myfile:
            print(myfile) # and done other things

list_dir = "path_of_zipfiles" #

 for idx1, folder in enumerate(list_dir):
    get_all_zips = get_filepaths(folder)
    for idx2, zip_file in enumerate(get_all_zips):
       with zipfile.ZipFile(zip_file) as zipobj:
          p = Process(target=run_performance,args=(zipobj.filename,))
          p.start()
     p.join()

我的显示:

|████▌                                   | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s)|████▌                                   | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s)|████▌                                   | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s
...

如果我将行 p.join() 放置为与 p.start() 相同的缩进,显示是正确的,但多处理不再工作。 所以脚本太耗时了:

1m18s 与 0m14s

期望的输出:

|████████████████████████████████████████| 1/1 [100%] in 2.4s (0.41/s)
|████████████████████████████████████████| 2/2 [100%] in 4.7s (0.43/s)
|████████████████████                    | ▄▂▂ 1/2 [50%] in 2s (0.6/s, eta: 0s)

似乎 alive_bar 记得调用时光标的位置,并从该点开始绘制条形图。当您启动多个进程时,每个进程都不知道另一个进程,输出会变得混乱。

确实,github 中有一个关于此的未解决问题(参见 here)。有一些使用多线程的 hacky 解决方案,但我认为使用多处理解决它并不容易,除非你在进程间通信上实现某种会减慢速度的东西。

首先是关于您的代码的一些一般性评论。在您的主要进程中,您使用文件路径打开 zip 存档只是为了取回原始文件名。这真的没有太大意义。然后在 count_files_7z 中迭代 zf.namelist() 中的 return 值以在 zf.namelist() 已经是这些文件的列表时构建存档中的文件列表。这也没有多大意义。您还使用上下文管理器功能 closing 来确保存档在块的末尾关闭,但是 with 块本身是一个上下文管理器,具有相同的目的。

我试过安装alive-progress,进度条乱七八糟。这是一项更适合多线程而不是多处理的任务。实际上,它可能更适合串行处理,因为对磁盘进行并发 I/O 操作,除非它是固态驱动器,否则可能会损害性能。如果您读取的文件涉及繁重的 CPU-intensive 处理,您将获得性能提升。如果是这种情况,我已将一个多处理池传递给每个线程,您可以在其中执行对 apply 的调用,并指定已放置 CPU-intensive 代码的函数。但是在多线程而不是多处理下完成时,进度条应该会更好地工作。即使那样,我也无法使用 alive-progress 获得任何体面的显示,诚然我没有花太多时间。所以我转而使用更常见的 tqdm 模块,可从 PyPi 存储库获得。

即使使用 tqdm 也有一个问题,当进度条达到 100% 时,tqdm 一定是在写东西(一个换行符?)重新定位其他进度条。因此,我所做的是指定 leave=False,这会导致条形图在达到 100% 时消失。但至少你可以看到所有的进度条在进行时没有失真。

from multiprocessing.pool import Pool, ThreadPool
from threading import Lock
import tqdm
from zipfile import ZipFile
import os
import heapq

def get_filepaths(directory):
    file_paths = []  # List which will store all of the full filepaths.
    # Walk the tree.
    for root, directories, files in os.walk(directory):
        for filename in files:
            # Join the two strings in order to form the full filepath.
            filepath = os.path.join(root, filename)
            file_paths.append(filepath)  # Add it to the list.
    return file_paths  # Self-explanatory.


def get_free_position():
    """ Return the minimum possible position """
    with lock:
        free_position = heapq.heappop(free_positions)
    return free_position

def return_free_position(position):
    with lock:
        heapq.heappush(free_positions, position)

def run_performance(zip_file):
    position = get_free_position()
    with ZipFile(zip_file) as zf:
        file_list = zf.namelist()
        with tqdm.tqdm(total=len(file_list), position=position, leave=False) as bar:
            for f in file_list:
                with zf.open(f) as myfile:
                    ... # do things with myfile (perhaps myfile.read())
                    # for CPU-intensive tasks: result = pool.apply(some_function, args=(arg1, arg2, ... argn))
                    import time
                    time.sleep(.005) # simulate doing something
                bar.update()
    return_free_position(position)

def generate_zip_files():
    list_dir = ['path1', 'path2']
    for folder in list_dir:
        get_all_zips = get_filepaths(folder)
        for zip_file in get_all_zips:
            yield zip_file

# Required for Windows:
if __name__ == '__main__':
    N_THREADS = 5
    free_positions = list(range(N_THREADS)) # already a heap
    lock = Lock()
    pool = Pool()
    thread_pool = ThreadPool(N_THREADS)
    for result in thread_pool.imap_unordered(run_performance, generate_zip_files()):
        pass
    pool.close()
    pool.join()
    thread_pool.close()
    thread_pool.join()

上面的代码使用了一个任意大小限制为 5 的多处理线程池作为演示。您可以将 N_THREADS 增加或减少到您想要的任何值,但正如我所说,它可能会或可能不会帮助提高性能。如果您希望每个 zip 文件一个线程,则:

if __name__ == '__main__':
    zip_files = list(generate_zip_files())
    N_THREADS = len(zip_files)
    free_positions = list(range(N_THREADS)) # already a heap
    lock = Lock()
    pool = Pool()
    thread_pool = ThreadPool(N_THREADS)
    for result in thread_pool.imap_unordered(run_performance, zip_files):
        pass
    pool.close()
    pool.join()
    thread_pool.close()
    thread_pool.join()

Enlighten codebase there is an example 类似的东西。您只需将 process_files() 函数替换为您自己的函数即可。

在这里重新创建有点大,但想法是您实际上应该只在主进程中进行控制台输出,并使用某种形式的 IPC 来中继来自子进程的信息。 Enlighten 示例使用 IPC 队列,这是非常合理的,因为它只发送当前计数。