
Simultaneously map video and data streams to one subprocess pipeline in real-time

我需要在 OpenCV/Python 中同时实时处理视频流和 klvdata 流。我正在使用 FFMPEG 读取文件或流,因为 OpenCV 不保留 klvdata。我使用子进程模块将数据传递给 OpenCV。

我的问题是我不知道如何将视频和 klvdata 同时映射到同一个子进程管道?


#!/usr/bin/env python3
import sys, json, klvdata;
from subprocess import PIPE
import subprocess as sp
import cv2
import numpy

command = ['ffmpeg',
    '-i', 'DayFlight.mpg',
    '-map', '0:0',
    '-map', '0:d',        
    '-pix_fmt', 'bgr24',
    '-c:v', 'rawvideo',      
    '-f', 'image2pipe', '-',
    '-c:d', 'copy',

pipe = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=10**8)

while True:
   raw_image = pipe.stdout.read(1280*720*3)
   image =  numpy.fromstring(raw_image, dtype='uint8')
   image = image.reshape((720,1280,3))          
   if image is not None:
      cv2.imshow('Video', image)
   if cv2.waitKey(1) & 0xFF == ord('q'):
   for packet in klvdata.StreamParser(pipe.stdout): 
      metadata = packet.MetadataList()


Traceback (most recent call last):
  File "test_cv.py", line 32, in <module>
    metadata = packet.MetadataList()
AttributeError: 'UnknownElement' object has no attribute 'MetadataList'



Day Flight.mpg示例文件的数据包比帧少得多,使用建议的解决方案无法准确同步(我认为使用管道方法不可能)。
我们仍然可以应用一些粗略的同步 - 假设数据和帧是在时间接近的情况下读取的。


                                       --->| Raw Video | ---> stderr (pipe)
 -----------        -------------     |     -----------    
| Input     |      | FFmpeg      |    |
| Video with| ---> | sub-process | ---      
| Data      |      |             |    |    
 -----------        -------------     |     -----------
                                       --->| KLV data  | ---> stdout (pipe)


  • 视频 reader 线程 - 读取原始视频帧(BGR)格式。
  • 数据reader 线程 - 读取和解析 KLV 数据。


正在从标准输出管道读取 KLV 数据:
从管道读取数据时(以 real-time 之类的方式),我们需要知道要读取的预期字节数。
这迫使我们对 KLV 数据进行部分解析:

  • 读取“密钥”(假设长度为 16 字节)。
  • 阅读“长度”-“BER 长度”标准存在一些挑战。
  • 读取“数据”(要读取的大小由长度定义)。

读取密钥、长度和数据后,我们得到一个“KLV数据包”,我们可以发送到KLV data parser

这是与 Day Flight.mpg 示例输入文件一起工作的代码示例:

#!/usr/bin/env python3
import klvdata
import subprocess as sp
import shlex
import threading
import numpy as np
import cv2
from io import BytesIO

# Video reader thread.
def video_reader(pipe):
    cols, rows = 1280, 720  # Assume we know frame size is 1280x720

    counter = 0
    while True:
        raw_image = pipe.read(cols*rows*3)  # Read raw video frame

        # Break the loop when length is too small
        if len(raw_image) < cols*rows*3:

        if (counter % 60) == 0:
            # Show video frame evey 60 frames
            image = np.frombuffer(raw_image, np.uint8).reshape([rows, cols, 3])
            cv2.imshow('Video', image) # Show video image for testing
        counter += 1

# https://github.com/paretech/klvdata/tree/master/klvdata
def bytes_to_int(value, signed=False):
    """Return integer given bytes."""
    return int.from_bytes(bytes(value), byteorder='big', signed=signed)

# Data reader thread (read KLV data).
def data_reader(pipe):
    key_length = 16  # Assume key length is 16 bytes.

    f = open('data.bin', 'wb')  # For testing - store the KLV data to data.bin (binary file)

    while True:
        # https://en.wikipedia.org/wiki/KLV
        # The first few bytes are the Key, much like a key in a standard hash table data structure.
        # Keys can be 1, 2, 4, or 16 bytes in length.
        # Presumably in a separate specification document you would agree on a key length for a given application.
        key = pipe.read(key_length)  # Read the key
        if len(key) < key_length:
            break  # Break the loop when length is too small
        f.write(key)  # Write data to binary file for testing

        # https://github.com/paretech/klvdata/tree/master/klvdata
        # Length field
        len_byte = pipe.read(1)

        if len(len_byte) < 1:
            break  # Break the loop when length is too small
        f.write(len_byte)  # Write data to binary file for testing

        byte_length = bytes_to_int(len_byte)

        # https://github.com/paretech/klvdata/tree/master/klvdata                                                
        if byte_length < 128:
            # BER Short Form
            length = byte_length
            ber_len_bytes = b''
            # BER Long Form
            ber_len = byte_length - 128
            ber_len_bytes = pipe.read(ber_len)

            if len(ber_len_bytes) < ber_len:
                break  # Break the loop when length is too small
            f.write(ber_len_bytes)  # Write ber_len_bytes to binary file for testing

            length = bytes_to_int(ber_len_bytes)

        # Read the value (length bytes)
        value = pipe.read(length)
        if len(value) < length:
            break  # Break the loop when length is too small
        f.write(value)  # Write data to binary file for testing

        klv_data = key + len_byte + ber_len_bytes + value  # Concatenate key length and data
        klv_data_as_bytes_io = BytesIO(klv_data)  # Wrap klv_data with BytesIO (before parsing)

        # Parse the KLV data
        for packet in klvdata.StreamParser(klv_data_as_bytes_io): 
            metadata = packet.MetadataList()
            print() # New line

# Execute FFmpeg as sub-process
# Map the video to stderr and map the data to stdout
process = sp.Popen(shlex.split('ffmpeg -hide_banner -loglevel quiet '                        # Set loglevel to quiet for disabling the prints ot stderr
                               '-i "Day Flight.mpg" '                                        # Input video "Day Flight.mpg"
                               '-map 0:v -c:v rawvideo -pix_fmt bgr24 -f:v rawvideo pipe:2 ' # rawvideo format is mapped to stderr pipe (raw video codec with bgr24 pixel format)
                               '-map 0:d -c copy -copy_unknown -f:d data pipe:1 '            # Copy the data without ddecoding.
                               '-report'),                                                   # Create a log file (because we can't the statuses that are usually printed to stderr).
                                stdout=sp.PIPE, stderr=sp.PIPE)

# Start video reader thread (pass stderr pipe as argument).
video_thread = threading.Thread(target=video_reader, args=(process.stderr,))

# Start data reader thread (pass stdout pipe as argument).
data_thread = threading.Thread(target=data_reader, args=(process.stdout,))

# Wait for threads (and process) to finish.

data.bin 可用于一致性检查。
执行 FFmpeg CLI 以提取数据流:

ffmpeg -y -i "Day Flight.mpg" -map 0:d -c copy -copy_unknown -f data raw.bin

验证 raw.bindata.bin 文件是否相等。