如何将实时数据读入循环与 Python 中的更多处理密集型操作分开?

How to keep a real-time data read-in loop separate from more processing intensive operations in Python?

所以我有一个 OpenCV 网络摄像头源,我想尽快从中读取帧。由于 Python GIL,我的脚本可以读取帧的最快速度如下:

#Parent or maybe client(?) script

#initilize the video capture object
cam  = cv2.VideoCapture(0)

while True:
    ret, frame = cam.read()

    # Some code to pass this numpy frame array to another python script 
    # (which has a queue) that is not under time constraint and also
    # is doing some more computationally intensive post-processing...

    if exit_condition is True:
        break

我想要的是将这些帧(Numpy 数组)添加到第二个 Python 脚本(或者可能是多处理实例?)中的某种处理队列中,然后执行一些 post 处理不受时间限制,例如 cam.read() 循环是...

所以基本的想法应该是这样的:

实时(或尽可能快的)数据收集(摄像头读取)脚本 ----> 分析脚本(将进行 post 处理、写入结果并生成稍微落后于数据收集的 matplotlib 图)

我做了一些研究,似乎像:管道、套接字、pyzmq 和 python 多处理都可以实现我正在寻找的东西。问题是我对以上任何一项都没有经验。

所以我的问题是什么方法最能实现我正在寻找的东西,谁能提供一个简短的例子,甚至 thoughts/ideas 来为我指明正确的方向?

非常感谢!

编辑:非常感谢史蒂夫让我走上正轨。这是我的想法的工作要点......代码可以正常工作,但如果添加更多 post- 处理步骤,那么队列大小的增长速度可能会超过主进程可以处理的速度。限制帧速率的建议可能会成为我最终使用的策略。

import time
import cv2
import multiprocessing as mp

def control_expt(connection_obj, q_obj, expt_dur):

    def elapsed_time(start_time):
        return time.clock()-start_time

    #Wait for the signal from the parent process to begin grabbing frames
    while True:
        msg = connection_obj.recv()     
        if msg == 'Start!':
            break    

    #initilize the video capture object
    cam  = cv2.VideoCapture(cv2.CAP_DSHOW + 0)  

    #start the clock!!
    expt_start_time = time.clock() 

    while True:
        ret, frame = cam.read()          
        q_obj.put_nowait((elapsed_time(expt_start_time), frame))

        if elapsed_time(expt_start_time) >= expt_dur:
            q_obj.put_nowait((elapsed_time(expt_start_time),'stop'))
            connection_obj.close()
            q_obj.close()
            cam.release()
            break

class test_class(object):
    def __init__(self, expt_dur):

        self.parent_conn, self.child_conn = mp.Pipe()
        self.q  = mp.Queue()
        self.control_expt_process = mp.Process(target=control_expt, args=(self.child_conn, self.q, expt_dur))
        self.control_expt_process.start()

    def frame_processor(self):
        self.parent_conn.send('Start!')

        prev_time_stamp = 0

        while True:
           time_stamp, frame = self.q.get()                        
           #print (time_stamp, stim_bool)          
           fps = 1/(time_stamp-prev_time_stamp)
           prev_time_stamp = time_stamp      

            #Do post processing of frame here but need to be careful that q.qsize doesn't end up growing too quickly...
            print (int(self.q.qsize()), fps)

            if frame == 'stop':
                print 'destroy all frames!'
                cv2.destroyAllWindows()
                break               
            else:
                cv2.imshow('test', frame)        
                cv2.waitKey(30)

        self.control_expt_process.terminate()

if __name__ == '__main__':  
    x = test_class(expt_dur = 60)
    x.frame_processor()

多处理文档是一个很好的起点。 https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes 我建议您阅读本文,即使您现在可能不理解它。

在您提到的其他技术上使用管道将允许您保持性能并保持代码简单。

下面是一些我没有测试过的代码,应该可以给你一个很好的起点。

from multiprocessing import Process, Pipe

def read_frames(connection_obj):
  #initilize the video capture object
  cam  = cv2.VideoCapture(0)
  while True:
    ret, frame = cam.read()
    connection_obj.send(frame) # is this what you want to send?

    if exit_condition is True:
        connection_obj.close()
        break

def launch_read_frames(connection_obj):
    """
    Starts the read_frames function in a separate process.
    param connection_obj: pipe to send frames into.
    Returns a pipe object
    """
    read_frames_process = Process(target=read_frames, args=(connection_obj,)) # this trailing comma is intentional
    read_frames_process.start()
    read_frames_process.join()

    return parent_conn

def act_on_frames(connection_obj):
    while True:
        frame = connection_obj.recv()
        # Do some stuff with each frame here

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    launch_read_frames(child_conn)

    # You could also call this function as a separate process, though in
    # this instance I see no performance benefit.
    act_on_frames(parent_conn)