为 for 循环创建主进程

Creating main process for a for loop

这个程序 returns 视频的分辨率,但由于我需要一个大型项目,所以我需要多处理。我已经尝试使用不同的函数并进行并行处理,但这只会 运行 它多次而不会使其有效我发布了整个代码。你能帮我创建一个占用所有内核的主进程吗?

import os
from tkinter.filedialog import askdirectory
from moviepy.editor import VideoFileClip


if __name__ == "__main__":
    dire = askdirectory()
    d = dire[:]
    print(dire)
    death = os.listdir(dire)
    print(death)
    for i in death: #multiprocess this loop
        dire = d
        dire += f"/{i}"
        v = VideoFileClip(dire)
        print(f"{i}: {v.size}")

这段代码工作正常,但我需要帮助来单独为 for 循环创建一个主进程(使用所有内核)。你能原谅我对多处理很生气的变量名吗?另外,如果您有提高代码效率的提示,我将不胜感激。

我想你是在假设目录中的每个文件都是一个视频剪辑。我假设处理视频剪辑是一个 I/O 绑定的“进程”,线程适用于此。在这里,我相当随意地以这种方式创建了一个大小为 20 个线程的线程池:

MAX_WORKERS = 20 # never more than this
N_WORKERS = min(MAX_WORKERS, len(death))

您必须试验 MAX_WORKERS 在性能下降之前可以有多大。这可能是一个较低的数字,不是因为您的系统不能支持大量线程,而是因为并发访问磁盘上可能分布在介质上的多个文件可能效率低下。

import os
from tkinter.filedialog import askdirectory
from moviepy.editor import VideoFileClip
from concurrent.futures import ThreadPoolExecutor as Executor
from functools import partial


def process_video(parent_dir, file):
    v = VideoFileClip(f"{parent_dir}/{file}")
    print(f"{file}: {v.size}")


if __name__ == "__main__":
    dire = askdirectory()
    print(dire)
    death = os.listdir(dire)
    print(death)
    worker = partial(process_video, dire)
    MAX_WORKERS = 20 # never more than this
    N_WORKERS = min(MAX_WORKERS, len(death))
    with Executor(max_workers=N_WORKERS) as executor:
        results = executor.map(worker, death) # results is a list: [None, None, ...]

更新

根据@Reishin 的说法,moviepy 导致执行 ffmpeg 可执行文件,因此最终创建了一个正在完成工作的进程。所以我们在这里也使用多处理是没有意义的。

moviepy 只是 ffmpeg 的包装器,旨在编辑剪辑,因此一次只能处理一个文件 - 性能很差。每次为多个文件调用新进程非常耗时。最后,需要多个进程可能是因为选择了错误的库。

我建议改用 pyAV lib,它为 ffmpeg 提供了直接的 py 绑定和良好的性能:

import av
import os
from tkinter.filedialog import askdirectory
import multiprocessing
from concurrent.futures import ThreadPoolExecutor as Executor

MAX_WORKERS = int(multiprocessing.cpu_count() * 1.5)

def get_video_resolution(path):
  container = None
  try:
    container = av.open(path)
    frame = next(container.decode(video=0))
    return path, f"{frame.width}x{frame.height}"
  finally:
    if container:
      container.close()

def files_to_proccess():
  video_dir = askdirectory()
  return (full_file_path for f in os.listdir(video_dir) if (full_file_path := os.path.join(video_dir, f)) and not os.path.isdir(full_file_path))


def main():   
 for f in files_to_proccess():
    print(f"{os.path.basename(f)}: {get_video_resolution(f)[1]}")


def main_multi_threaded():
  with Executor(max_workers=MAX_WORKERS) as executor:
    for path, resolution in executor.map(get_video_resolution, files_to_proccess()):
        print(f"{os.path.basename(path)}: {resolution}")


if __name__ == "__main__":
  #main()
  main_multi_threaded()

以上是单线程和多线程的实现,具有最佳的并行度设置(如果绝对需要多线程)