Python 的 `ThreadPoolExecutor` 没有使用执行器的数量
Python's `ThreadPoolExecutor` does not utilize number of executors
我有以下 Python 代码:
import sys
import os
from concurrent.futures import ThreadPoolExecutor
VIDEOS = [ # A list of 9 videos
{... bla bla ...},
{... bla bla ...},
{... bla bla ...}
]
SAMPLING_FREQUENCIES = [1, 2.4, 3.71, ... , 14.3] # A list of 8 frequencies
def process_video(video_obj, sampling_frequency, process_work_dir):
os.makedirs(process_work_dir)
# ... do some heavy processing ...
if __name__ == '__main__':
output_work_dir = sys.argv[1]
os.makedirs(output_work_dir)
executor = ThreadPoolExecutor(max_workers=4)
for video_obj in VIDEOS:
for samp_fps in SAMPLING_FREQUENCIES: # Totally 8 * 9 threads should be waiting to be executed
work_dir = os.path.join(output_work_dir, os.path.basename(video_obj['path']), str(samp_fps))
executor.submit(lambda : process_video(video_obj, samp_fps, work_dir))
我观察到的是,一开始,正如预期的那样,有 4 个线程 运行。我可以通过确保恰好创建了 4 个工作目录来验证它。
然后,第一个线程完成了它的工作,正如预期的那样,另一个一直在等待的线程开始了 运行。
问题是,虽然我创建了 72 个线程(9 个视频乘以 8 个采样频率),但没有线程再被执行。当第 5 个线程完成其工作时,应用程序终止。
这是一个错误,还是我对 ThreadPoolExecutor 的理解有问题API?
我用的是Python3.6.5
如果你想让执行者完成所有任务,那么你可能想在 with
块中声明它。这里我添加的打印语句将在所有线程完成后打印。
你看到的问题是执行者会提交东西而不是等待,让你的代码在遇到 EOF
解释器关闭时等待的任务时死掉。
*并非如此请忽略
还要注意这一行:
executor.submit(lambda : process_video(video_obj, samp_fps, work_dir))
这似乎是 lambda 是 process_video
方法的结果,这意味着它将在主线程中 运行 。我认为如果按照文档建议的那样指定,用法会更清晰。
executor.submit(process_video, video_obj, samp_fps, work_dir)
试试这个:
if __name__ == '__main__':
output_work_dir = sys.argv[1]
os.makedirs(output_work_dir)
with ThreadPoolExecutor(max_workers=4) as executor:
for video_obj in VIDEOS:
for samp_fps in SAMPLING_FREQUENCIES: # Totally 8 * 9 threads should be waiting to be executed
work_dir = os.path.join(output_work_dir, os.path.basename(video_obj['path']), str(samp_fps))
executor.submit(process_video, video_obj, samp_fps, work_dir)
print("Hello, all done with thread work!")
我有以下 Python 代码:
import sys
import os
from concurrent.futures import ThreadPoolExecutor
VIDEOS = [ # A list of 9 videos
{... bla bla ...},
{... bla bla ...},
{... bla bla ...}
]
SAMPLING_FREQUENCIES = [1, 2.4, 3.71, ... , 14.3] # A list of 8 frequencies
def process_video(video_obj, sampling_frequency, process_work_dir):
os.makedirs(process_work_dir)
# ... do some heavy processing ...
if __name__ == '__main__':
output_work_dir = sys.argv[1]
os.makedirs(output_work_dir)
executor = ThreadPoolExecutor(max_workers=4)
for video_obj in VIDEOS:
for samp_fps in SAMPLING_FREQUENCIES: # Totally 8 * 9 threads should be waiting to be executed
work_dir = os.path.join(output_work_dir, os.path.basename(video_obj['path']), str(samp_fps))
executor.submit(lambda : process_video(video_obj, samp_fps, work_dir))
我观察到的是,一开始,正如预期的那样,有 4 个线程 运行。我可以通过确保恰好创建了 4 个工作目录来验证它。
然后,第一个线程完成了它的工作,正如预期的那样,另一个一直在等待的线程开始了 运行。
问题是,虽然我创建了 72 个线程(9 个视频乘以 8 个采样频率),但没有线程再被执行。当第 5 个线程完成其工作时,应用程序终止。
这是一个错误,还是我对 ThreadPoolExecutor 的理解有问题API?
我用的是Python3.6.5
如果你想让执行者完成所有任务,那么你可能想在 with
块中声明它。这里我添加的打印语句将在所有线程完成后打印。
你看到的问题是执行者会提交东西而不是等待,让你的代码在遇到 EOF
解释器关闭时等待的任务时死掉。
*并非如此请忽略
还要注意这一行:
executor.submit(lambda : process_video(video_obj, samp_fps, work_dir))
这似乎是 lambda 是 process_video
方法的结果,这意味着它将在主线程中 运行 。我认为如果按照文档建议的那样指定,用法会更清晰。
executor.submit(process_video, video_obj, samp_fps, work_dir)
试试这个:
if __name__ == '__main__':
output_work_dir = sys.argv[1]
os.makedirs(output_work_dir)
with ThreadPoolExecutor(max_workers=4) as executor:
for video_obj in VIDEOS:
for samp_fps in SAMPLING_FREQUENCIES: # Totally 8 * 9 threads should be waiting to be executed
work_dir = os.path.join(output_work_dir, os.path.basename(video_obj['path']), str(samp_fps))
executor.submit(process_video, video_obj, samp_fps, work_dir)
print("Hello, all done with thread work!")