如何有效地利用多处理和多线程并行使用 python 将 1000 多个视频文件转换为音频

How to utilize multiprocessing and multithreading efficiently to convert 1000s of video files to audio using python in parallel

我尝试使用 moviepy python 包将视频文件转换为音频。它工作得很好。 但是,我有 1500 个 100MB 大小的视频,我想将它们全部转换为音频文件。使用标准方法需要花费大量时间。

将一个视频文件转换为音频的代码:

import moviepy.editor as mp
clip = mp.VideoFileClip('file.mp4') 
clip.audio.write_audiofile(r"file.mp3")

我也可以使用线程同时转换多个文件,但我想同时利用多处理和多线程来以更少的时间复杂度最有效地实现结果。

仅使用线程的算法:

clip1...clip10= make 10 lists with 150 files names from os.listdir()
spawn 10 threads to process 10 files at a time.

t1= Thread(target=convert, args=(clips1))
.
.
.
t10= Thread(target=convert, args=(clips2))

有什么想法吗?

如果您的所有视频都具有相同的音频编解码器(AAC 又名 MP4 音频),您只需将音频流混合到单独的文件中即可。不需要 convert/encode 任何东西。

ffmpeg -i input.mp4 -vn -c:a copy audio.m4a

或者,如果你真的想要 MP3 文件(我不推荐这个,MP3 已经过时了),你可以执行以下操作:

ffmpeg -i input.mp4 -vn -c:a mp3 -b:a 256k audio.mp3

每个视频只需要几毫秒。

在这种情况下,多线程和多处理的组合可能是有利的,这是当正在执行的任务由整齐划定的部分组成时,其中一个部分主要是 I/O 绑定(或至少放弃全局Interpreter Lock 经常允许其他线程 运行) 而另一部分是 CPU 密集型。例如,您需要执行由两部分组成的多项任务:(1) 从网站检索一条信息,以及 (2) 然后使用该信息进行一些重要的计算。第 1 部分显然非常适合多线程,因为在发出检索 URL 的请求后,线程将进入等待状态,允许其他线程 运行。如果第 2 部分是一个微不足道的计算,为了简单起见,您只需在线程内计算即可。但是因为我们说它是非常重要的,所以在一个我们不必担心全局解释器锁 (GIL) 争用的单独进程中执行计算会更好。

执行上述类型处理的模型是同时创建线程池和多处理池。 “作业”被提交给线程池工作函数,指定需要从中检索信息的网站的 URL 作为一个参数,将多处理池作为另一个参数。线程池工作函数首先从传递的 URL 中检索所需的信息,然后提交给第二个工作函数,该函数使用传递的多处理池执行计算。

就是说,我不太明白您的情况如何巧妙地划分为纯 I/O 绑定部分和纯 CPU 绑定部分。调用 clip = mp.VideoFileClip('file.mp4') 显然是在执行 I/O 和处理数据以供查看。同样,clip.audio.write_audiofile(r"file.mp3") 进行 CPU 处理以将视频剪辑转换为音频剪辑,我认为这主要是一个 CPU 绑定过程,然后写出文件,这显然是I/O 绑定进程。

如果 API 的设计不同,文件的读取和写入是不同的方法,那么我认为同时使用线程和多处理会更可行。例如:

with open('file.mp4', 'rb') as f:
    mp4_file = f.read() # I/O
clip = mp.VideoClipFromMemory(mp4_file) # CPU
clip.convertToAudio() # CPU
clip.writeFile('file.mp3') # I/O

所以最大的问题是:您将视频转换为音频的“工作”是 CPU 绑定还是 I/O 绑定。如果是前者,那么您应该使用多处理池,这可能会因池大小大于您拥有的 CPU 核心数而受益,因为进程在等待 CPU 时毕竟会进入等待状态 I/O 才能完成,因为作业并非纯粹 CPU-绑定。如果是后者,那么您应该使用多线程,因为线程在创建时所涉及的开销较少。但我怀疑你会用多处理做得更好。下面的代码,有几个小的变化可以使用:

import moviepy.editor as mp
import glob
import os
from concurrent.futures import ProcessPoolExecutor as Executor
# To use multithreading:
# from concurrent.futures import ThreadPoolExecutor as Executor

def converter(filename):
    clip = mp.VideoFileClip(f'{filename}.mp4') 
    clip.audio.write_audiofile(f'{filename}.mp3')

def main():
    mp4_filenames = map(lambda x: x.split('.')[0], glob.iglob('*.mp4'))
    POOL_SIZE = os.cpu_count() # number of cores
    # You might want to try a larger size, especially if you are using a thread pool:
    with Executor(max_workers=POOL_SIZE) as executor:
        executor.map(converter, mp4_filenames)

# required for multiprocessing under Windows
if __name__ == '__main__':
    main()

补充Comment/Suggestion

我的建议是尝试一个小样本,比如 100 个文件,两种方法(ProcessPoolExecutor 和 ThreadPoolExecutor)都使用相同的池大小 os.cpu_count() 和 运行ning 针对相同的 100 个文件看看哪一个在更短的时间内完成。如果是 ProcessPoolExecutor 运行,您可以查看增加池大小是否有助于重叠 I/O 处理并提高吞吐量。如果是 ThreadPoolExecutor 运行,您可以大大增加线程池大小,直到看到性能下降为止。线程池大小为 100(或处理所有文件时更大)并非不合理。