如何在 Python 中实现流水线?

How to implement Pipelining in Python?

我有一个程序可以处理一些标记的实时视频。

分为:

  1. 导入视频的下一张图片
  2. 将图像转换为可读形式
  3. 标记检测
  4. 标记跟踪
  5. 画画UI

这在我的电脑上工作得很好,但它也需要在 Raspberry Pi 上工作,所以一直只使用一个核心不会削减它。

这就是我要介绍流水线的原因。 在大学的计算机体系结构课程中,我学习了硬件流水线,所以我想知道是否可以在 python:

中实现类似的东西

所以与其做 导入 -> 转换 -> 处理 -> 跟踪 -> 绘制 -> ...

我想这样做:

-1----2----3----4-----5----...
Imp--Imp--Imp--Imp---Imp---...
-----Conv-Conv-Conv--Conv--...
----------Pro--Pro---Pro---...
---------------Track-Track-...
---------------------Draw--...

这样每 "clock cycle" 个图像就可以准备好,而不仅仅是每 5 个图像。

所以我正在考虑为此使用 python 的多处理库,但我没有这方面的经验,但有一些简单的测试程序,所以我不确定什么最适合这个用例,即队列、池、经理,...

已解决:

这可以通过 mpipe 来完成,mpipe 是一个用于 python 的很酷的流水线工具包。 [http://vmlaker.github.io/mpipe/][1]

while True:
    stage1 = mpipe.OrderedStage(conversion, 3)
    stage2 = mpipe.OrderedStage(processing, 3)
    stage3 = mpipe.OrderedStage(tracking, 3)
    stage4 = mpipe.OrderedStage(draw_squares, 3)
    stage5 = mpipe.OrderedStage(ui, 3)

    pipe = mpipe.Pipeline(stage1.link(stage2.link(stage3.link(stage4.link(stage5)))))

    images = []
    while len(images) < 3:
        ret = False
        while not ret:
            ret, image = cap.read()
        images.append(image)

    for i in images:
        t = (i, frame_counter, multi_tracker)
        pipe.put(t)

    pipe.put(None)

    for result in pipe.results():
        image, multi_tracker, frame_counter = result
        Show.show_win("video", image)

正如@r_e建议的那样,我在开始时阅读了多张图片并用它填充了一个管道。现在在计算的每一步中都会启动多个工作进程,以便每个人都可以处理单独的图像。

由于除了图像之外还需要传递一些附加信息我只是 return 图像和附加信息并在下一阶段再次解压。

目前我不得不禁用跟踪,所以我无法将它与旧版本进行比较。 Atm 它有点慢(跟踪会提高速度,因为我不需要在每一帧中检测对象,而是每 30 帧检测一次)。但如果我让它工作,我会给你更新。

由于我没有50个声望,我无法发表评论。我也没有这方面的经验,但一点点搜索让我进入了以下网站,其中讨论了 real-time 和使用多处理库进行视频处理。希望对你有帮助。

1) 读取帧;将它们放入输入队列中,每个队列都有相应的帧号:

  # Check input queue is not full
  if not input_q.full():
     # Read frame and store in input queue
     ret, frame = vs.read()
      if ret:            
        input_q.put((int(vs.get(cv2.CAP_PROP_POS_FRAMES)),frame))

2) 从输入队列中取出帧并输出对应的帧号:

while True:
  frame = input_q.get()
frame_rgb = cv2.cvtColor(frame[1], cv2.COLOR_BGR2RGB)
  output_q.put((frame[0], detect_objects(frame_rgb, sess, detection_graph)))

3) 如果输出队列不为空,则恢复输出队列中处理过的帧并馈送优先级队列

# Check output queue is not empty
if not output_q.empty():
  # Recover treated frame in output queue and feed priority queue
  output_pq.put(output_q.get())

4) 绘制帧直到输出队列为空

# Check output priority queue is not empty
  if not output_pq.empty():
    prior, output_frame = output_pq.get()
    if prior > countWriteFrame:
      output_pq.put((prior, output_frame))
    else: 
      countWriteFrame = countWriteFrame + 1    
      # Draw something with your frame

5) 最后,停止,检查输入队列是否为空。如果是,则中断。

if((not ret) & input_q.empty() & 
    output_q.empty() & output_pq.empty()):
  break

Link可以找到HERE

我对此进行了一些尝试。它在很大程度上基于您的图表并使用 5 级管道和 multi-processing。在接近尾声时开始阅读:

def main():
    ...
    ...

#!/usr/bin/env python3

import logging
import numpy as np
from time import sleep
from multiprocessing import Process, Queue

class Stage1(Process):
    """Acquire frames as fast as possible and send to next stage"""
    def __init__(self, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.oqueue = oqueue      # output queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage1 %(message)s',
                        filename='log-stage1.txt', filemode='w')
        logging.info('started')

        # Generate frames and send down pipeline
        for f in range(NFRAMES):
            logging.debug('Generating frame %d',f)
            # Generate frame of random stuff
            frame = np.random.randint(0,256,(480,640,3), dtype=np.uint8)
            logging.debug('Forwarding frame %d',f)
            self.oqueue.put(frame)

class Stage2(Process):
    """Read frames from previous stage as fast as possible, process and send to next stage"""
    def __init__(self, iqueue, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
        self.oqueue = oqueue      # output queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage2 %(message)s',
                        filename='log-stage2.txt', filemode='w')
        logging.info('started')

        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Received frame %d', f)
            # Process frame ...

            logging.debug('Forwarding frame %d', f)
            self.oqueue.put(frame)

class Stage3(Process):
    """Read frames from previous stage as fast as possible, process and send to next stage"""
    def __init__(self, iqueue, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
        self.oqueue = oqueue      # output queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage3 %(message)s',
                        filename='log-stage3.txt', filemode='w')
        logging.info('started')
        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Received frame %d', f)
            # Process frame ...

            logging.debug('Forwarding frame %d', f)
            self.oqueue.put(frame)

class Stage4(Process):
    """Read frames from previous stage as fast as possible, process and send to next stage"""
    def __init__(self, iqueue, oqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue
        self.oqueue = oqueue      # output queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage4 %(message)s',
                        filename='log-stage4.txt', filemode='w')
        logging.info('started')

        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Received frame %d', f)
            # Process frame ...

            logging.debug('Forwarding frame %d', f)
            self.oqueue.put(frame)

class Stage5(Process):
    """Read frames from previous stage as fast as possible, and display"""
    def __init__(self, iqueue):
        super().__init__()
        # Pick up parameters and store in class variables
        self.iqueue = iqueue      # input queue

    def run(self,):
        # Turn on logging
        logging.basicConfig(level=logging.DEBUG,
                        format='%(created).6f [%(levelname)s] Stage5 %(message)s',
                        filename='log-stage5.txt', filemode='w')
        logging.info('started')

        for f in range(NFRAMES):
            # Wait for next frame
            frame = self.iqueue.get()
            logging.debug('Displaying frame %d', f)
            # Display frame ...

def main():
    # Create Queues to send data between pipeline stages
    q1_2 = Queue(5)    # queue between stages 1 and 2
    q2_3 = Queue(5)    # queue between stages 2 and 3
    q3_4 = Queue(5)    # queue between stages 3 and 4
    q4_5 = Queue(5)    # queue between stages 4 and 5

    # Create Processes for stages of pipeline
    stages = []
    stages.append(Stage1(q1_2))
    stages.append(Stage2(q1_2,q2_3))
    stages.append(Stage3(q2_3,q3_4))
    stages.append(Stage4(q3_4,q4_5))
    stages.append(Stage5(q4_5))

    # Start the stages
    for stage in stages:
        stage.start()

    # Wait for stages to finish
    for stage in stages:
        stage.join()

if __name__ == "__main__":
    NFRAMES = 1000
    main()

目前它只是生成一帧随机噪声并将其传递到管道中。由于 filemode='w',它将每个进程记录到一个单独的文件中,它会为程序的每个新 运行 覆盖该文件。您可以看到这样的个人日志:

-rw-r--r--  1 mark  staff  1097820 26 Jun 17:07 log-stage1.txt
-rw-r--r--  1 mark  staff  1077820 26 Jun 17:07 log-stage2.txt
-rw-r--r--  1 mark  staff  1077820 26 Jun 17:07 log-stage3.txt
-rw-r--r--  1 mark  staff  1077820 26 Jun 17:07 log-stage4.txt
-rw-r--r--  1 mark  staff   548930 26 Jun 17:07 log-stage5.txt

然后您可以看到每个进程接收和发送每一帧的时间:

more log-stage1.txt

1561565618.603456 [INFO] Stage1 started
1561565618.604812 [DEBUG] Stage1 Generating frame 0
1561565618.623938 [DEBUG] Stage1 Forwarding frame 0
1561565618.625659 [DEBUG] Stage1 Generating frame 1
1561565618.647139 [DEBUG] Stage1 Forwarding frame 1
1561565618.648173 [DEBUG] Stage1 Generating frame 2
1561565618.687316 [DEBUG] Stage1 Forwarding frame 2

或跟踪说 "frame 1" 通过阶段:

pi@pi3:~ $ grep "frame 1$" log*

log-stage1.txt:1561565618.625659 [DEBUG] Stage1 Generating frame 1
log-stage1.txt:1561565618.647139 [DEBUG] Stage1 Forwarding frame 1
log-stage2.txt:1561565618.671272 [DEBUG] Stage2 Received frame 1
log-stage2.txt:1561565618.672272 [DEBUG] Stage2 Forwarding frame 1
log-stage3.txt:1561565618.713618 [DEBUG] Stage3 Received frame 1
log-stage3.txt:1561565618.715468 [DEBUG] Stage3 Forwarding frame 1
log-stage4.txt:1561565618.746488 [DEBUG] Stage4 Received frame 1
log-stage4.txt:1561565618.747617 [DEBUG] Stage4 Forwarding frame 1
log-stage5.txt:1561565618.790802 [DEBUG] Stage5 Displaying frame 1

或按时间顺序将所有日志组合在一起:

sort -g log*

1561565618.603456 [INFO] Stage1 started
1561565618.604812 [DEBUG] Stage1 Generating frame 0
1561565618.607765 [INFO] Stage2 started
1561565618.612311 [INFO] Stage3 started
1561565618.618425 [INFO] Stage4 started
1561565618.618785 [INFO] Stage5 started
1561565618.623938 [DEBUG] Stage1 Forwarding frame 0
1561565618.625659 [DEBUG] Stage1 Generating frame 1
1561565618.640585 [DEBUG] Stage2 Received frame 0
1561565618.642438 [DEBUG] Stage2 Forwarding frame 0

因此,在 r_e 的帮助下,我找到了一个名为 mpipe 的简洁工具包,它可用于 python 的流水线操作。

测试时我发现导入和显示图像比转换、处理和绘制 UI 快很多,所以我只使用 3 级管道。

使用起来相当简单:

def conversion(input_data):
    original, frame_counter = input_data
    ...
    return con.gbg(con.down_sample(con.copy_img(original))), frame_counter

def processing(input_data):
    image, frame_counter = input_data
    ...
    return markers, frame_counter


def ui(input_data):
    markers, frame_counter = input_data
    ...
    return image, frame_counter, triangle


def main():
    ...
    while True:
        stage1 = mpipe.OrderedStage(conversion, 3)

        stage2 = mpipe.OrderedStage(processing, 3)

        stage3 = mpipe.OrderedStage(ui, 3)

        pipe = mpipe.Pipeline(stage1.link(stage2.link(stage3)))

        images = []
        while len(images) < 3:
            ret = False
            while not ret:
                ret, image = cap.read()
            images.append(image)

        for i in images:
            t = (i, frame_counter)
            pipe.put(t)

        pipe.put(None)

        for result in pipe.results():
            image, frame_counter, triangle = result
            if not triangle:
                if t_count > 6:
                    Show.show_win("video", image)
                    t_count = 0
                else:
                    t_count += 1
            else:
                Show.show_win("video", image)