多进程视频处理
Multi process Video Processing
我想对相邻帧进行视频处理。更具体地说,我想计算相邻帧之间的均方误差:
mean_squared_error(prev_frame,frame)
我知道如何以线性直接的方式计算它:我使用 imutils 包来利用队列来分离加载帧和处理它们。通过将它们存储在队列中,我不需要在处理它们之前等待它们。 ...但我想更快...
# import the necessary packages to read the video
import imutils
from imutils.video import FileVideoStream
# package to compute mean squared errror
from skimage.metrics import mean_squared_error
if __name__ == '__main__':
# SPECIFY PATH TO VIDEO FILE
file = "VIDEO_PATH.mp4"
# START IMUTILS VIDEO STREAM
print("[INFO] starting video file thread...")
fvs = FileVideoStream(path_video, transform=transform_image).start()
# INITALIZE LIST to store the results
mean_square_error_list = []
# READ PREVIOUS FRAME
prev_frame = fvs.read()
# LOOP over frames from the video file stream
while fvs.more():
# GRAP THE NEXT FRAME from the threaded video file stream
frame = fvs.read()
# COMPUTE the metric
metric_val = mean_squared_error(prev_frame,frame)
mean_square_error_list.append(1-metric_val) # Append to list
# UPDATE previous frame variable
prev_frame = frame
现在我的问题是:如何对指标的计算进行多重处理以提高速度并节省时间?
My operating system is Windows 10 and I am using python 3.8.0
让事情变得更快的方面太多了,我只关注多处理部分。
由于您不想一次阅读整个视频,我们必须逐帧阅读视频。
我将使用 opencv (cv2), numpy 读取帧,计算 mse,并将 mse 保存到磁盘.
首先,我们可以在没有任何多处理的情况下开始,这样我们就可以对我们的结果进行基准测试。我正在使用 1920 x 1080 尺寸的视频,60 FPS,时长:1:29 ,大小:100 MB。
import cv2
import sys
import time
import numpy as np
import subprocess as sp
import multiprocessing as mp
filename = '2.mp4'
def process_video():
cap = cv2.VideoCapture(filename)
proc_frames = 0
mse = []
prev_frame = None
ret = True
while ret:
ret, frame = cap.read() # reading frames sequentially
if ret == False:
break
if not (prev_frame is None):
c_mse = np.mean(np.square(prev_frame-frame))
mse.append(c_mse)
prev_frame = frame
proc_frames += 1
np.save('data/' + 'sp' + '.npy', np.array(mse))
cap.release()
return
if __name__ == "__main__":
t1 = time.time()
process_video()
t2 = time.time()
print(t2-t1)
在我的系统中,它 运行 持续 142 秒。
现在,我们可以采用多处理方法。这个想法可以用下图来概括。
GIF 出处:Google
我们制作一些片段(基于我们有多少 cpu 个核心)并并行处理这些片段帧。
import cv2
import sys
import time
import numpy as np
import subprocess as sp
import multiprocessing as mp
filename = '2.mp4'
def process_video(group_number):
cap = cv2.VideoCapture(filename)
num_processes = mp.cpu_count()
frame_jump_unit = cap.get(cv2.CAP_PROP_FRAME_COUNT) // num_processes
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_jump_unit * group_number)
proc_frames = 0
mse = []
prev_frame = None
while proc_frames < frame_jump_unit:
ret, frame = cap.read()
if ret == False:
break
if not (prev_frame is None):
c_mse = np.mean(np.square(prev_frame-frame))
mse.append(c_mse)
prev_frame = frame
proc_frames += 1
np.save('data/' + str(group_number) + '.npy', np.array(mse))
cap.release()
return
if __name__ == "__main__":
t1 = time.time()
num_processes = mp.cpu_count()
print(f'CPU: {num_processes}')
# only meta-data
cap = cv2.VideoCapture(filename)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
frame_jump_unit = cap.get(cv2.CAP_PROP_FRAME_COUNT) // num_processes
cap.release()
p = mp.Pool(num_processes)
p.map(process_video, range(num_processes))
# merging
# the missing mse will be
final_mse = []
for i in range(num_processes):
na = np.load(f'data/{i}.npy')
final_mse.extend(na)
try:
cap = cv2.VideoCapture(filename) # you could also take it outside the loop to reduce some overhead
frame_no = (frame_jump_unit) * (i+1) - 1
print(frame_no)
cap.set(1, frame_no)
_, frame1 = cap.read()
#cap.set(1, ((frame_jump_unit) * (i+1)))
_, frame2 = cap.read()
c_mse = np.mean(np.square(frame1-frame2))
final_mse.append(c_mse)
cap.release()
except:
print('failed in 1 case')
# in the last few frames, nothing left
pass
t2 = time.time()
print(t2-t1)
np.save(f'data/final_mse.npy', np.array(final_mse))
我只使用 numpy save
来保存部分结果,您可以尝试更好的方法。
这个 运行s 49.56 秒 我的 cpu_count
= 12。确实有一些瓶颈可以避免 运行 更快。
我的实现的唯一问题是,它缺少 mse
用于分割视频的区域,添加起来非常容易。由于我们可以在 O(1) 中使用 OpenCV 在任何位置索引单个帧,因此我们可以转到这些位置并分别计算 mse
并合并到最终解决方案。 [检查修复合并部分的更新代码]
您可以编写一个简单的完整性检查来确保两者提供相同的结果。
import numpy as np
a = np.load('data/sp.npy')
b = np.load('data/final_mse.npy')
print(a.shape)
print(b.shape)
print(a[:10])
print(b[:10])
for i in range(len(a)):
if a[i] != b[i]:
print(i)
现在,一些额外的加速可以来自使用 CUDA 编译的 opencv、ffmpeg、添加排队机制和多处理等。
我想对相邻帧进行视频处理。更具体地说,我想计算相邻帧之间的均方误差:
mean_squared_error(prev_frame,frame)
我知道如何以线性直接的方式计算它:我使用 imutils 包来利用队列来分离加载帧和处理它们。通过将它们存储在队列中,我不需要在处理它们之前等待它们。 ...但我想更快...
# import the necessary packages to read the video
import imutils
from imutils.video import FileVideoStream
# package to compute mean squared errror
from skimage.metrics import mean_squared_error
if __name__ == '__main__':
# SPECIFY PATH TO VIDEO FILE
file = "VIDEO_PATH.mp4"
# START IMUTILS VIDEO STREAM
print("[INFO] starting video file thread...")
fvs = FileVideoStream(path_video, transform=transform_image).start()
# INITALIZE LIST to store the results
mean_square_error_list = []
# READ PREVIOUS FRAME
prev_frame = fvs.read()
# LOOP over frames from the video file stream
while fvs.more():
# GRAP THE NEXT FRAME from the threaded video file stream
frame = fvs.read()
# COMPUTE the metric
metric_val = mean_squared_error(prev_frame,frame)
mean_square_error_list.append(1-metric_val) # Append to list
# UPDATE previous frame variable
prev_frame = frame
现在我的问题是:如何对指标的计算进行多重处理以提高速度并节省时间?
My operating system is Windows 10 and I am using python 3.8.0
让事情变得更快的方面太多了,我只关注多处理部分。
由于您不想一次阅读整个视频,我们必须逐帧阅读视频。
我将使用 opencv (cv2), numpy 读取帧,计算 mse,并将 mse 保存到磁盘.
首先,我们可以在没有任何多处理的情况下开始,这样我们就可以对我们的结果进行基准测试。我正在使用 1920 x 1080 尺寸的视频,60 FPS,时长:1:29 ,大小:100 MB。
import cv2
import sys
import time
import numpy as np
import subprocess as sp
import multiprocessing as mp
filename = '2.mp4'
def process_video():
cap = cv2.VideoCapture(filename)
proc_frames = 0
mse = []
prev_frame = None
ret = True
while ret:
ret, frame = cap.read() # reading frames sequentially
if ret == False:
break
if not (prev_frame is None):
c_mse = np.mean(np.square(prev_frame-frame))
mse.append(c_mse)
prev_frame = frame
proc_frames += 1
np.save('data/' + 'sp' + '.npy', np.array(mse))
cap.release()
return
if __name__ == "__main__":
t1 = time.time()
process_video()
t2 = time.time()
print(t2-t1)
在我的系统中,它 运行 持续 142 秒。
现在,我们可以采用多处理方法。这个想法可以用下图来概括。
GIF 出处:Google
我们制作一些片段(基于我们有多少 cpu 个核心)并并行处理这些片段帧。
import cv2
import sys
import time
import numpy as np
import subprocess as sp
import multiprocessing as mp
filename = '2.mp4'
def process_video(group_number):
cap = cv2.VideoCapture(filename)
num_processes = mp.cpu_count()
frame_jump_unit = cap.get(cv2.CAP_PROP_FRAME_COUNT) // num_processes
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_jump_unit * group_number)
proc_frames = 0
mse = []
prev_frame = None
while proc_frames < frame_jump_unit:
ret, frame = cap.read()
if ret == False:
break
if not (prev_frame is None):
c_mse = np.mean(np.square(prev_frame-frame))
mse.append(c_mse)
prev_frame = frame
proc_frames += 1
np.save('data/' + str(group_number) + '.npy', np.array(mse))
cap.release()
return
if __name__ == "__main__":
t1 = time.time()
num_processes = mp.cpu_count()
print(f'CPU: {num_processes}')
# only meta-data
cap = cv2.VideoCapture(filename)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
frame_jump_unit = cap.get(cv2.CAP_PROP_FRAME_COUNT) // num_processes
cap.release()
p = mp.Pool(num_processes)
p.map(process_video, range(num_processes))
# merging
# the missing mse will be
final_mse = []
for i in range(num_processes):
na = np.load(f'data/{i}.npy')
final_mse.extend(na)
try:
cap = cv2.VideoCapture(filename) # you could also take it outside the loop to reduce some overhead
frame_no = (frame_jump_unit) * (i+1) - 1
print(frame_no)
cap.set(1, frame_no)
_, frame1 = cap.read()
#cap.set(1, ((frame_jump_unit) * (i+1)))
_, frame2 = cap.read()
c_mse = np.mean(np.square(frame1-frame2))
final_mse.append(c_mse)
cap.release()
except:
print('failed in 1 case')
# in the last few frames, nothing left
pass
t2 = time.time()
print(t2-t1)
np.save(f'data/final_mse.npy', np.array(final_mse))
我只使用 numpy save
来保存部分结果,您可以尝试更好的方法。
这个 运行s 49.56 秒 我的 cpu_count
= 12。确实有一些瓶颈可以避免 运行 更快。
我的实现的唯一问题是,它缺少 mse
用于分割视频的区域,添加起来非常容易。由于我们可以在 O(1) 中使用 OpenCV 在任何位置索引单个帧,因此我们可以转到这些位置并分别计算 mse
并合并到最终解决方案。 [检查修复合并部分的更新代码]
您可以编写一个简单的完整性检查来确保两者提供相同的结果。
import numpy as np
a = np.load('data/sp.npy')
b = np.load('data/final_mse.npy')
print(a.shape)
print(b.shape)
print(a[:10])
print(b[:10])
for i in range(len(a)):
if a[i] != b[i]:
print(i)
现在,一些额外的加速可以来自使用 CUDA 编译的 opencv、ffmpeg、添加排队机制和多处理等。