并行化进程相互依赖的 Python 进程时遇到问题
Having trouble parallelizing a Python process where Processes depend on each other
我在 Raspberry Pi 3B+ 上有一个深度学习应用程序。我有一个循环,首先从相机抓取一帧,然后将其传递给神经网络进行预测,然后在屏幕上显示带有预测的帧:
while True:
frame = cam.get_frame()
preds = model.predict(frame)
label, score, c, pts = get_best_classes(preds, model)
print("{} ({}): {:.2f}".format(label, c, score))
screen.draw(frame, pts)
抓取帧并显示非常快(实时),但预测约为 0.7 秒。当我 运行 它有大约 4 秒的延迟,这意味着当我移动相机时,屏幕上的输出只会在 4 秒后移动。我对此进行了研究,这是因为帧在模型可以预测之前就堆积起来了。 The solution是把取帧和预测放在不同的线程上,但是我没有线程和多进程的经验。
我在谷歌上搜索了很多教程,但它们都是并行打印内容的示例。我找不到介绍性教程,其中一个过程(在我的例子中是预测)取决于另一个过程的输出(从相机抓取的帧)。
所以我的问题有 3 个部分:
- 你能给我指出一个可能的解决方案吗?
- 能否请您提供一些指向教程的链接,其中进程共享数据并且一个进程仅在另一个进程完成其部分时才开始执行其部分
- 如何确保在无限循环中处理 运行?
我找到延迟的原因了。事实证明,cam.get_frame()
(这是一个围绕 cv2.VideoCapture().read()
that I wrote) has about 5-7 prebuffered frames. So each time I called it, it did not return the current frame but the next frame from the buffer. I found this 解决方案的薄包装是有帮助的。
修改代码后延迟消失:
# buffered frames
n = 5
while True:
# ignore buffered frames
for _ in range(n):
frame = cam.get_frame()
preds = model.predict(frame)
label, score, c, pts = get_best_classes(preds, model)
print("{} ({}): {:.2f}".format(label, c, score))
screen.draw(frame, pts)
如果您选择多处理路线,您可以使用多处理 Queue or a Pipe 在进程之间共享数据。队列是线程和进程安全的。使用 Pipe 时必须更加小心,因为如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。当然,同时使用管道不同端的进程不会有损坏的风险。
对于您的应用程序,我建议使用 多线程 路线。这个想法是有两个线程来避免顺序获取和处理帧。
- 线程 #1 - 专用于仅从相机流中读取帧。
- 线程 #2 - 专用于处理帧(预测)。
我们将阅读框架与处理分开,因为 cv2.VideoCapture.read()
是一个阻塞操作。因此,我们通过减少由于 I/O 操作引起的延迟,在其自己的独立线程中将帧读取到 'improve' FPS。此外,通过将帧捕获隔离到它自己的线程,将始终有一个帧准备好进行处理,而不必等待 I/O 操作完成和 return 一个新帧。在我们专用于处理的主线程中,我们现在可以自由地进行预测,而无需等待相机抓取下一帧。
这是一个专用于仅从相机流中读取帧的小部件。在主程序中,您可以自由process/predict使用最新的框架。
from threading import Thread
import cv2
class VideoStreamWidget(object):
def __init__(self, src=0):
# Create a VideoCapture object
self.capture = cv2.VideoCapture(src)
# Start the thread to read frames from the video stream
self.thread = Thread(target=self.update, args=())
self.thread.daemon = True
self.thread.start()
def update(self):
# Read the next frame from the stream in a different thread
while True:
if self.capture.isOpened():
(self.status, self.frame) = self.capture.read()
def show_frame(self):
# Display frames in main program
if self.status:
cv2.imshow('frame', self.frame)
# Press Q on keyboard to stop stream
key = cv2.waitKey(1)
if key == ord('q'):
self.capture.release()
cv2.destroyAllWindows()
exit(1)
def grab_latest_frame(self):
return self.frame
if __name__ == '__main__':
video_stream_widget = VideoStreamWidget(0)
while True:
try:
video_stream_widget.show_frame()
latest_frame = video_stream_widget.grab_latest_frame()
# Do processing here with the latest frame
# ...
# ...
except AttributeError:
pass
我在 Raspberry Pi 3B+ 上有一个深度学习应用程序。我有一个循环,首先从相机抓取一帧,然后将其传递给神经网络进行预测,然后在屏幕上显示带有预测的帧:
while True:
frame = cam.get_frame()
preds = model.predict(frame)
label, score, c, pts = get_best_classes(preds, model)
print("{} ({}): {:.2f}".format(label, c, score))
screen.draw(frame, pts)
抓取帧并显示非常快(实时),但预测约为 0.7 秒。当我 运行 它有大约 4 秒的延迟,这意味着当我移动相机时,屏幕上的输出只会在 4 秒后移动。我对此进行了研究,这是因为帧在模型可以预测之前就堆积起来了。 The solution是把取帧和预测放在不同的线程上,但是我没有线程和多进程的经验。
我在谷歌上搜索了很多教程,但它们都是并行打印内容的示例。我找不到介绍性教程,其中一个过程(在我的例子中是预测)取决于另一个过程的输出(从相机抓取的帧)。
所以我的问题有 3 个部分:
- 你能给我指出一个可能的解决方案吗?
- 能否请您提供一些指向教程的链接,其中进程共享数据并且一个进程仅在另一个进程完成其部分时才开始执行其部分
- 如何确保在无限循环中处理 运行?
我找到延迟的原因了。事实证明,cam.get_frame()
(这是一个围绕 cv2.VideoCapture().read()
that I wrote) has about 5-7 prebuffered frames. So each time I called it, it did not return the current frame but the next frame from the buffer. I found this 解决方案的薄包装是有帮助的。
修改代码后延迟消失:
# buffered frames
n = 5
while True:
# ignore buffered frames
for _ in range(n):
frame = cam.get_frame()
preds = model.predict(frame)
label, score, c, pts = get_best_classes(preds, model)
print("{} ({}): {:.2f}".format(label, c, score))
screen.draw(frame, pts)
如果您选择多处理路线,您可以使用多处理 Queue or a Pipe 在进程之间共享数据。队列是线程和进程安全的。使用 Pipe 时必须更加小心,因为如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。当然,同时使用管道不同端的进程不会有损坏的风险。
对于您的应用程序,我建议使用 多线程 路线。这个想法是有两个线程来避免顺序获取和处理帧。
- 线程 #1 - 专用于仅从相机流中读取帧。
- 线程 #2 - 专用于处理帧(预测)。
我们将阅读框架与处理分开,因为 cv2.VideoCapture.read()
是一个阻塞操作。因此,我们通过减少由于 I/O 操作引起的延迟,在其自己的独立线程中将帧读取到 'improve' FPS。此外,通过将帧捕获隔离到它自己的线程,将始终有一个帧准备好进行处理,而不必等待 I/O 操作完成和 return 一个新帧。在我们专用于处理的主线程中,我们现在可以自由地进行预测,而无需等待相机抓取下一帧。
这是一个专用于仅从相机流中读取帧的小部件。在主程序中,您可以自由process/predict使用最新的框架。
from threading import Thread
import cv2
class VideoStreamWidget(object):
def __init__(self, src=0):
# Create a VideoCapture object
self.capture = cv2.VideoCapture(src)
# Start the thread to read frames from the video stream
self.thread = Thread(target=self.update, args=())
self.thread.daemon = True
self.thread.start()
def update(self):
# Read the next frame from the stream in a different thread
while True:
if self.capture.isOpened():
(self.status, self.frame) = self.capture.read()
def show_frame(self):
# Display frames in main program
if self.status:
cv2.imshow('frame', self.frame)
# Press Q on keyboard to stop stream
key = cv2.waitKey(1)
if key == ord('q'):
self.capture.release()
cv2.destroyAllWindows()
exit(1)
def grab_latest_frame(self):
return self.frame
if __name__ == '__main__':
video_stream_widget = VideoStreamWidget(0)
while True:
try:
video_stream_widget.show_frame()
latest_frame = video_stream_widget.grab_latest_frame()
# Do processing here with the latest frame
# ...
# ...
except AttributeError:
pass