如何将实时数据读入循环与 Python 中的更多处理密集型操作分开?
How to keep a real-time data read-in loop separate from more processing intensive operations in Python?
所以我有一个 OpenCV 网络摄像头源,我想尽快从中读取帧。由于 Python GIL,我的脚本可以读取帧的最快速度如下:
#Parent or maybe client(?) script
#initilize the video capture object
cam = cv2.VideoCapture(0)
while True:
ret, frame = cam.read()
# Some code to pass this numpy frame array to another python script
# (which has a queue) that is not under time constraint and also
# is doing some more computationally intensive post-processing...
if exit_condition is True:
break
我想要的是将这些帧(Numpy 数组)添加到第二个 Python 脚本(或者可能是多处理实例?)中的某种处理队列中,然后执行一些 post 处理不受时间限制,例如 cam.read() 循环是...
所以基本的想法应该是这样的:
实时(或尽可能快的)数据收集(摄像头读取)脚本
---->
分析脚本(将进行 post 处理、写入结果并生成稍微落后于数据收集的 matplotlib 图)
我做了一些研究,似乎像:管道、套接字、pyzmq 和 python 多处理都可以实现我正在寻找的东西。问题是我对以上任何一项都没有经验。
所以我的问题是什么方法最能实现我正在寻找的东西,谁能提供一个简短的例子,甚至 thoughts/ideas 来为我指明正确的方向?
非常感谢!
编辑:非常感谢史蒂夫让我走上正轨。这是我的想法的工作要点......代码可以正常工作,但如果添加更多 post- 处理步骤,那么队列大小的增长速度可能会超过主进程可以处理的速度。限制帧速率的建议可能会成为我最终使用的策略。
import time
import cv2
import multiprocessing as mp
def control_expt(connection_obj, q_obj, expt_dur):
def elapsed_time(start_time):
return time.clock()-start_time
#Wait for the signal from the parent process to begin grabbing frames
while True:
msg = connection_obj.recv()
if msg == 'Start!':
break
#initilize the video capture object
cam = cv2.VideoCapture(cv2.CAP_DSHOW + 0)
#start the clock!!
expt_start_time = time.clock()
while True:
ret, frame = cam.read()
q_obj.put_nowait((elapsed_time(expt_start_time), frame))
if elapsed_time(expt_start_time) >= expt_dur:
q_obj.put_nowait((elapsed_time(expt_start_time),'stop'))
connection_obj.close()
q_obj.close()
cam.release()
break
class test_class(object):
def __init__(self, expt_dur):
self.parent_conn, self.child_conn = mp.Pipe()
self.q = mp.Queue()
self.control_expt_process = mp.Process(target=control_expt, args=(self.child_conn, self.q, expt_dur))
self.control_expt_process.start()
def frame_processor(self):
self.parent_conn.send('Start!')
prev_time_stamp = 0
while True:
time_stamp, frame = self.q.get()
#print (time_stamp, stim_bool)
fps = 1/(time_stamp-prev_time_stamp)
prev_time_stamp = time_stamp
#Do post processing of frame here but need to be careful that q.qsize doesn't end up growing too quickly...
print (int(self.q.qsize()), fps)
if frame == 'stop':
print 'destroy all frames!'
cv2.destroyAllWindows()
break
else:
cv2.imshow('test', frame)
cv2.waitKey(30)
self.control_expt_process.terminate()
if __name__ == '__main__':
x = test_class(expt_dur = 60)
x.frame_processor()
多处理文档是一个很好的起点。 https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes
我建议您阅读本文,即使您现在可能不理解它。
在您提到的其他技术上使用管道将允许您保持性能并保持代码简单。
下面是一些我没有测试过的代码,应该可以给你一个很好的起点。
from multiprocessing import Process, Pipe
def read_frames(connection_obj):
#initilize the video capture object
cam = cv2.VideoCapture(0)
while True:
ret, frame = cam.read()
connection_obj.send(frame) # is this what you want to send?
if exit_condition is True:
connection_obj.close()
break
def launch_read_frames(connection_obj):
"""
Starts the read_frames function in a separate process.
param connection_obj: pipe to send frames into.
Returns a pipe object
"""
read_frames_process = Process(target=read_frames, args=(connection_obj,)) # this trailing comma is intentional
read_frames_process.start()
read_frames_process.join()
return parent_conn
def act_on_frames(connection_obj):
while True:
frame = connection_obj.recv()
# Do some stuff with each frame here
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
launch_read_frames(child_conn)
# You could also call this function as a separate process, though in
# this instance I see no performance benefit.
act_on_frames(parent_conn)
所以我有一个 OpenCV 网络摄像头源,我想尽快从中读取帧。由于 Python GIL,我的脚本可以读取帧的最快速度如下:
#Parent or maybe client(?) script
#initilize the video capture object
cam = cv2.VideoCapture(0)
while True:
ret, frame = cam.read()
# Some code to pass this numpy frame array to another python script
# (which has a queue) that is not under time constraint and also
# is doing some more computationally intensive post-processing...
if exit_condition is True:
break
我想要的是将这些帧(Numpy 数组)添加到第二个 Python 脚本(或者可能是多处理实例?)中的某种处理队列中,然后执行一些 post 处理不受时间限制,例如 cam.read() 循环是...
所以基本的想法应该是这样的:
实时(或尽可能快的)数据收集(摄像头读取)脚本 ----> 分析脚本(将进行 post 处理、写入结果并生成稍微落后于数据收集的 matplotlib 图)
我做了一些研究,似乎像:管道、套接字、pyzmq 和 python 多处理都可以实现我正在寻找的东西。问题是我对以上任何一项都没有经验。
所以我的问题是什么方法最能实现我正在寻找的东西,谁能提供一个简短的例子,甚至 thoughts/ideas 来为我指明正确的方向?
非常感谢!
编辑:非常感谢史蒂夫让我走上正轨。这是我的想法的工作要点......代码可以正常工作,但如果添加更多 post- 处理步骤,那么队列大小的增长速度可能会超过主进程可以处理的速度。限制帧速率的建议可能会成为我最终使用的策略。
import time
import cv2
import multiprocessing as mp
def control_expt(connection_obj, q_obj, expt_dur):
def elapsed_time(start_time):
return time.clock()-start_time
#Wait for the signal from the parent process to begin grabbing frames
while True:
msg = connection_obj.recv()
if msg == 'Start!':
break
#initilize the video capture object
cam = cv2.VideoCapture(cv2.CAP_DSHOW + 0)
#start the clock!!
expt_start_time = time.clock()
while True:
ret, frame = cam.read()
q_obj.put_nowait((elapsed_time(expt_start_time), frame))
if elapsed_time(expt_start_time) >= expt_dur:
q_obj.put_nowait((elapsed_time(expt_start_time),'stop'))
connection_obj.close()
q_obj.close()
cam.release()
break
class test_class(object):
def __init__(self, expt_dur):
self.parent_conn, self.child_conn = mp.Pipe()
self.q = mp.Queue()
self.control_expt_process = mp.Process(target=control_expt, args=(self.child_conn, self.q, expt_dur))
self.control_expt_process.start()
def frame_processor(self):
self.parent_conn.send('Start!')
prev_time_stamp = 0
while True:
time_stamp, frame = self.q.get()
#print (time_stamp, stim_bool)
fps = 1/(time_stamp-prev_time_stamp)
prev_time_stamp = time_stamp
#Do post processing of frame here but need to be careful that q.qsize doesn't end up growing too quickly...
print (int(self.q.qsize()), fps)
if frame == 'stop':
print 'destroy all frames!'
cv2.destroyAllWindows()
break
else:
cv2.imshow('test', frame)
cv2.waitKey(30)
self.control_expt_process.terminate()
if __name__ == '__main__':
x = test_class(expt_dur = 60)
x.frame_processor()
多处理文档是一个很好的起点。 https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes 我建议您阅读本文,即使您现在可能不理解它。
在您提到的其他技术上使用管道将允许您保持性能并保持代码简单。
下面是一些我没有测试过的代码,应该可以给你一个很好的起点。
from multiprocessing import Process, Pipe
def read_frames(connection_obj):
#initilize the video capture object
cam = cv2.VideoCapture(0)
while True:
ret, frame = cam.read()
connection_obj.send(frame) # is this what you want to send?
if exit_condition is True:
connection_obj.close()
break
def launch_read_frames(connection_obj):
"""
Starts the read_frames function in a separate process.
param connection_obj: pipe to send frames into.
Returns a pipe object
"""
read_frames_process = Process(target=read_frames, args=(connection_obj,)) # this trailing comma is intentional
read_frames_process.start()
read_frames_process.join()
return parent_conn
def act_on_frames(connection_obj):
while True:
frame = connection_obj.recv()
# Do some stuff with each frame here
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
launch_read_frames(child_conn)
# You could also call this function as a separate process, though in
# this instance I see no performance benefit.
act_on_frames(parent_conn)