Python 多处理进度方法

Python multiprocessing progress approach

我一直在忙着编写我的第一个多处理代码并且它工作正常,是的。 但是,现在我想要一些关于进展的反馈,我不确定最好的方法是什么。

简而言之,我的代码(见下文)做了什么:

我正在寻找的可能是:

  1. 简单
  1. 喜欢
Core 0  processing file 20 of 317 ||||||____ 60% completed
Core 1  processing file 21 of 317 |||||||||_ 90% completed
...
Core 7  processing file 18 of 317 ||________ 20% completed

我阅读了有关队列、池、tqdm 的各种信息,但我不确定该走哪条路。谁能指出在这种情况下可行的方法?

提前致谢!

编辑:按照 gsb22

的建议更改了启动进程的代码

我的代码:

# file operations
import os
import glob
# Multiprocessing
from multiprocessing import Process
# Motion detection
import cv2


# >>> Enter directory to scan as target directory
targetDirectory = "E:\Projects\Programming\Python\OpenCV\videofiles"

def get_videofiles(target_directory):

    # Find all video files in directory and subdirectories and put them in a list
    videofiles = glob.glob(target_directory + '/**/*.mp4', recursive=True)
    # Return the list
    return videofiles


def process_file(videofile):

    '''
    What happens inside this function:
    - The video is processed and analysed using openCV
    - The result (an image) is saved to the results folder
    - Once this function receives the videofile it completes
      without the need to return anything to the main program
    '''

    # The processing code is more complex than this code below, this is just a test
    cap = cv2.VideoCapture(videofile)

    for i in range(10):
        succes, frame = cap.read()

        # cv2.imwrite('{}/_Results/{}_result{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)

        if succes:
            try:
                cv2.imwrite('{}/_Results/{}_result_{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)
            except:
                print('something went wrong')


if __name__ == "__main__":

    # Create directory to save results if it doesn't exist
    if not os.path.exists(targetDirectory + '/_Results'):
        os.makedirs(targetDirectory + '/_Results')

    # Get a list of all video files in the target directory
    all_files = get_videofiles(targetDirectory)

    print(f'{len(all_files)} video files found')

    # Create list of jobs (processes)
    jobs = []

    # Create and start processes
    for file in all_files:
        proc = Process(target=process_file, args=(file,))
        jobs.append(proc)

    for job in jobs:
        job.start()

    for job in jobs:
        job.join()

    # TODO: Print some form of progress feedback

    print('Finished :)')

I read all kinds of info about queues, pools, tqdm and I'm not sure which way to go. Could anyone point to an approach that would work in this case?

这是一种以最低成本获取进度指示的非常简单的方法:

from multiprocessing.pool import Pool
from random import randint
from time import sleep

from tqdm import tqdm


def process(fn) -> bool:
    sleep(randint(1, 3))
    return randint(0, 100) < 70


files = [f"file-{i}.mp4" for i in range(20)]

success = []
failed = []
NPROC = 5
pool = Pool(NPROC)


for status, fn in tqdm(zip(pool.imap(process, files), files), total=len(files)):
    if status:
        success.append(fn)
    else:
        failed.append(fn)

print(f"{len(success)} succeeded and {len(failed)} failed")

一些评论:

  • tqdm 是一个非常好地实现进度条的第 3 方库。还有其他人。 pip install tqdm.
  • 我们使用 NPROC 个进程的池(几乎没有理由为像这样的简单事情自己管理进程)。我们让池处理对输入数据迭代我们的过程函数。
  • 我们通过函数 return 一个布尔值来表示状态(在这个例子中我们随机选择,权重有利于成功)。我们没有 return 文件名,尽管我们可以,因为它必须被序列化并从子进程发送,这是不必要的开销。
  • 我们使用 Pool.imap,它 return 是一个迭代器,它与我们传入的可迭代对象保持相同的顺序。所以我们可以使用 zip 来迭代 files直接地。因为我们使用了一个大小未知的迭代器,所以 tqdm 需要被告知它有多长。 (我们本可以使用 pool.map,但是没有必要提交 ram---尽管对于一个 bool 它可能没有什么区别。)

我特意把它写成一种食谱。只需使用 high-level 范式下降,您就可以对多处理做很多事情,Pool.[i]map 是最有用的范例之一。

参考资料

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool https://tqdm.github.io/