Python 多处理进度方法
Python multiprocessing progress approach
我一直在忙着编写我的第一个多处理代码并且它工作正常,是的。
但是,现在我想要一些关于进展的反馈,我不确定最好的方法是什么。
简而言之,我的代码(见下文)做了什么:
- 扫描目标目录中的 mp4 文件
- 每个文件由一个单独的进程分析,该进程保存一个结果(图像)
我正在寻找的可能是:
- 简单
- 每次进程完成一个文件时,它都会发送一条 'finished' 消息
- 主要代码记录完成了多少文件
- 喜欢
Core 0 processing file 20 of 317 ||||||____ 60% completed
Core 1 processing file 21 of 317 |||||||||_ 90% completed
...
Core 7 processing file 18 of 317 ||________ 20% completed
我阅读了有关队列、池、tqdm 的各种信息,但我不确定该走哪条路。谁能指出在这种情况下可行的方法?
提前致谢!
编辑:按照 gsb22
的建议更改了启动进程的代码
我的代码:
# file operations
import os
import glob
# Multiprocessing
from multiprocessing import Process
# Motion detection
import cv2
# >>> Enter directory to scan as target directory
targetDirectory = "E:\Projects\Programming\Python\OpenCV\videofiles"
def get_videofiles(target_directory):
# Find all video files in directory and subdirectories and put them in a list
videofiles = glob.glob(target_directory + '/**/*.mp4', recursive=True)
# Return the list
return videofiles
def process_file(videofile):
'''
What happens inside this function:
- The video is processed and analysed using openCV
- The result (an image) is saved to the results folder
- Once this function receives the videofile it completes
without the need to return anything to the main program
'''
# The processing code is more complex than this code below, this is just a test
cap = cv2.VideoCapture(videofile)
for i in range(10):
succes, frame = cap.read()
# cv2.imwrite('{}/_Results/{}_result{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)
if succes:
try:
cv2.imwrite('{}/_Results/{}_result_{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)
except:
print('something went wrong')
if __name__ == "__main__":
# Create directory to save results if it doesn't exist
if not os.path.exists(targetDirectory + '/_Results'):
os.makedirs(targetDirectory + '/_Results')
# Get a list of all video files in the target directory
all_files = get_videofiles(targetDirectory)
print(f'{len(all_files)} video files found')
# Create list of jobs (processes)
jobs = []
# Create and start processes
for file in all_files:
proc = Process(target=process_file, args=(file,))
jobs.append(proc)
for job in jobs:
job.start()
for job in jobs:
job.join()
# TODO: Print some form of progress feedback
print('Finished :)')
I read all kinds of info about queues, pools, tqdm and I'm not sure which way to go. Could anyone point to an approach that would work in this case?
这是一种以最低成本获取进度指示的非常简单的方法:
from multiprocessing.pool import Pool
from random import randint
from time import sleep
from tqdm import tqdm
def process(fn) -> bool:
sleep(randint(1, 3))
return randint(0, 100) < 70
files = [f"file-{i}.mp4" for i in range(20)]
success = []
failed = []
NPROC = 5
pool = Pool(NPROC)
for status, fn in tqdm(zip(pool.imap(process, files), files), total=len(files)):
if status:
success.append(fn)
else:
failed.append(fn)
print(f"{len(success)} succeeded and {len(failed)} failed")
一些评论:
- tqdm 是一个非常好地实现进度条的第 3 方库。还有其他人。
pip install tqdm
.
- 我们使用
NPROC
个进程的池(几乎没有理由为像这样的简单事情自己管理进程)。我们让池处理对输入数据迭代我们的过程函数。
- 我们通过函数 return 一个布尔值来表示状态(在这个例子中我们随机选择,权重有利于成功)。我们没有 return 文件名,尽管我们可以,因为它必须被序列化并从子进程发送,这是不必要的开销。
- 我们使用
Pool.imap
,它 return 是一个迭代器,它与我们传入的可迭代对象保持相同的顺序。所以我们可以使用 zip
来迭代 files
直接地。因为我们使用了一个大小未知的迭代器,所以 tqdm
需要被告知它有多长。 (我们本可以使用 pool.map
,但是没有必要提交 ram---尽管对于一个 bool 它可能没有什么区别。)
我特意把它写成一种食谱。只需使用 high-level 范式下降,您就可以对多处理做很多事情,Pool.[i]map
是最有用的范例之一。
参考资料
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
https://tqdm.github.io/
我一直在忙着编写我的第一个多处理代码并且它工作正常,是的。 但是,现在我想要一些关于进展的反馈,我不确定最好的方法是什么。
简而言之,我的代码(见下文)做了什么:
- 扫描目标目录中的 mp4 文件
- 每个文件由一个单独的进程分析,该进程保存一个结果(图像)
我正在寻找的可能是:
- 简单
- 每次进程完成一个文件时,它都会发送一条 'finished' 消息
- 主要代码记录完成了多少文件
- 喜欢
Core 0 processing file 20 of 317 ||||||____ 60% completed
Core 1 processing file 21 of 317 |||||||||_ 90% completed
...
Core 7 processing file 18 of 317 ||________ 20% completed
我阅读了有关队列、池、tqdm 的各种信息,但我不确定该走哪条路。谁能指出在这种情况下可行的方法?
提前致谢!
编辑:按照 gsb22
的建议更改了启动进程的代码我的代码:
# file operations
import os
import glob
# Multiprocessing
from multiprocessing import Process
# Motion detection
import cv2
# >>> Enter directory to scan as target directory
targetDirectory = "E:\Projects\Programming\Python\OpenCV\videofiles"
def get_videofiles(target_directory):
# Find all video files in directory and subdirectories and put them in a list
videofiles = glob.glob(target_directory + '/**/*.mp4', recursive=True)
# Return the list
return videofiles
def process_file(videofile):
'''
What happens inside this function:
- The video is processed and analysed using openCV
- The result (an image) is saved to the results folder
- Once this function receives the videofile it completes
without the need to return anything to the main program
'''
# The processing code is more complex than this code below, this is just a test
cap = cv2.VideoCapture(videofile)
for i in range(10):
succes, frame = cap.read()
# cv2.imwrite('{}/_Results/{}_result{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)
if succes:
try:
cv2.imwrite('{}/_Results/{}_result_{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)
except:
print('something went wrong')
if __name__ == "__main__":
# Create directory to save results if it doesn't exist
if not os.path.exists(targetDirectory + '/_Results'):
os.makedirs(targetDirectory + '/_Results')
# Get a list of all video files in the target directory
all_files = get_videofiles(targetDirectory)
print(f'{len(all_files)} video files found')
# Create list of jobs (processes)
jobs = []
# Create and start processes
for file in all_files:
proc = Process(target=process_file, args=(file,))
jobs.append(proc)
for job in jobs:
job.start()
for job in jobs:
job.join()
# TODO: Print some form of progress feedback
print('Finished :)')
I read all kinds of info about queues, pools, tqdm and I'm not sure which way to go. Could anyone point to an approach that would work in this case?
这是一种以最低成本获取进度指示的非常简单的方法:
from multiprocessing.pool import Pool
from random import randint
from time import sleep
from tqdm import tqdm
def process(fn) -> bool:
sleep(randint(1, 3))
return randint(0, 100) < 70
files = [f"file-{i}.mp4" for i in range(20)]
success = []
failed = []
NPROC = 5
pool = Pool(NPROC)
for status, fn in tqdm(zip(pool.imap(process, files), files), total=len(files)):
if status:
success.append(fn)
else:
failed.append(fn)
print(f"{len(success)} succeeded and {len(failed)} failed")
一些评论:
- tqdm 是一个非常好地实现进度条的第 3 方库。还有其他人。
pip install tqdm
. - 我们使用
NPROC
个进程的池(几乎没有理由为像这样的简单事情自己管理进程)。我们让池处理对输入数据迭代我们的过程函数。 - 我们通过函数 return 一个布尔值来表示状态(在这个例子中我们随机选择,权重有利于成功)。我们没有 return 文件名,尽管我们可以,因为它必须被序列化并从子进程发送,这是不必要的开销。
- 我们使用
Pool.imap
,它 return 是一个迭代器,它与我们传入的可迭代对象保持相同的顺序。所以我们可以使用zip
来迭代files
直接地。因为我们使用了一个大小未知的迭代器,所以tqdm
需要被告知它有多长。 (我们本可以使用pool.map
,但是没有必要提交 ram---尽管对于一个 bool 它可能没有什么区别。)
我特意把它写成一种食谱。只需使用 high-level 范式下降,您就可以对多处理做很多事情,Pool.[i]map
是最有用的范例之一。
参考资料
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool https://tqdm.github.io/