同时实时将视频和数据流映射到一个子流程管道
Simultaneously map video and data streams to one subprocess pipeline in real-time
我需要在 OpenCV/Python 中同时实时处理视频流和 klvdata 流。我正在使用 FFMPEG 读取文件或流,因为 OpenCV 不保留 klvdata。我使用子进程模块将数据传递给 OpenCV。
我的问题是我不知道如何将视频和 klvdata 同时映射到同一个子进程管道?
我的代码:
#!/usr/bin/env python3
import sys, json, klvdata;
from subprocess import PIPE
import subprocess as sp
import cv2
import numpy
command = ['ffmpeg',
'-i', 'DayFlight.mpg',
'-map', '0:0',
'-map', '0:d',
'-pix_fmt', 'bgr24',
'-c:v', 'rawvideo',
'-an','-sn',
'-f', 'image2pipe', '-',
'-c:d', 'copy',
'-f','data',
]
pipe = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=10**8)
while True:
raw_image = pipe.stdout.read(1280*720*3)
image = numpy.fromstring(raw_image, dtype='uint8')
image = image.reshape((720,1280,3))
if image is not None:
cv2.imshow('Video', image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
for packet in klvdata.StreamParser(pipe.stdout):
metadata = packet.MetadataList()
print(metadata)
pipe.stdout.flush()
cv2.destroyAllWindows()
产生以下错误:
Traceback (most recent call last):
File "test_cv.py", line 32, in <module>
metadata = packet.MetadataList()
AttributeError: 'UnknownElement' object has no attribute 'MetadataList'
非常感谢任何帮助。
为了分割视频和数据,我们可以将视频流映射到stderr
管道,将KLV数据流映射到stdout
管道。
我的 following answer.
中使用了相同的技术来分离视频和音频
当每个视频帧都有私有KLV数据(按顺序同步)时,视频帧和相应数据之间的精确同步相对简单。
Day Flight.mpg示例文件的数据包比帧少得多,使用建议的解决方案无法准确同步(我认为使用管道方法不可能)。
我们仍然可以应用一些粗略的同步 - 假设数据和帧是在时间接近的情况下读取的。
建议的视频和数据分割方式:
-----------
--->| Raw Video | ---> stderr (pipe)
----------- ------------- | -----------
| Input | | FFmpeg | |
| Video with| ---> | sub-process | ---
| Data | | | |
----------- ------------- | -----------
--->| KLV data | ---> stdout (pipe)
-----------
视频和数据在两个单独的线程中读取:
- 视频 reader 线程 - 读取原始视频帧(BGR)格式。
- 数据reader 线程 - 读取和解析 KLV 数据。
根据Wikipedia,KLV格式定义不明确:
Keys can be 1, 2, 4, or 16 bytes in length.
Presumably in a separate specification document you would agree on a key length for a given application.
示例视频中,密钥长度为16字节,但不保证...
正在从标准输出管道读取 KLV 数据:
从管道读取数据时(以 real-time 之类的方式),我们需要知道要读取的预期字节数。
这迫使我们对 KLV 数据进行部分解析:
- 读取“密钥”(假设长度为 16 字节)。
- 阅读“长度”-“BER 长度”标准存在一些挑战。
- 读取“数据”(要读取的大小由长度定义)。
读取密钥、长度和数据后,我们得到一个“KLV数据包”,我们可以发送到KLV data parser。
这是与 Day Flight.mpg
示例输入文件一起工作的代码示例:
#!/usr/bin/env python3
import klvdata
import subprocess as sp
import shlex
import threading
import numpy as np
import cv2
from io import BytesIO
# Video reader thread.
def video_reader(pipe):
cols, rows = 1280, 720 # Assume we know frame size is 1280x720
counter = 0
while True:
raw_image = pipe.read(cols*rows*3) # Read raw video frame
# Break the loop when length is too small
if len(raw_image) < cols*rows*3:
break
if (counter % 60) == 0:
# Show video frame evey 60 frames
image = np.frombuffer(raw_image, np.uint8).reshape([rows, cols, 3])
cv2.imshow('Video', image) # Show video image for testing
cv2.waitKey(1)
counter += 1
# https://github.com/paretech/klvdata/tree/master/klvdata
def bytes_to_int(value, signed=False):
"""Return integer given bytes."""
return int.from_bytes(bytes(value), byteorder='big', signed=signed)
# Data reader thread (read KLV data).
def data_reader(pipe):
key_length = 16 # Assume key length is 16 bytes.
f = open('data.bin', 'wb') # For testing - store the KLV data to data.bin (binary file)
while True:
# https://en.wikipedia.org/wiki/KLV
# The first few bytes are the Key, much like a key in a standard hash table data structure.
# Keys can be 1, 2, 4, or 16 bytes in length.
# Presumably in a separate specification document you would agree on a key length for a given application.
key = pipe.read(key_length) # Read the key
if len(key) < key_length:
break # Break the loop when length is too small
f.write(key) # Write data to binary file for testing
# https://github.com/paretech/klvdata/tree/master/klvdata
# Length field
len_byte = pipe.read(1)
if len(len_byte) < 1:
break # Break the loop when length is too small
f.write(len_byte) # Write data to binary file for testing
byte_length = bytes_to_int(len_byte)
# https://github.com/paretech/klvdata/tree/master/klvdata
if byte_length < 128:
# BER Short Form
length = byte_length
ber_len_bytes = b''
else:
# BER Long Form
ber_len = byte_length - 128
ber_len_bytes = pipe.read(ber_len)
if len(ber_len_bytes) < ber_len:
break # Break the loop when length is too small
f.write(ber_len_bytes) # Write ber_len_bytes to binary file for testing
length = bytes_to_int(ber_len_bytes)
# Read the value (length bytes)
value = pipe.read(length)
if len(value) < length:
break # Break the loop when length is too small
f.write(value) # Write data to binary file for testing
klv_data = key + len_byte + ber_len_bytes + value # Concatenate key length and data
klv_data_as_bytes_io = BytesIO(klv_data) # Wrap klv_data with BytesIO (before parsing)
# Parse the KLV data
for packet in klvdata.StreamParser(klv_data_as_bytes_io):
metadata = packet.MetadataList()
print(metadata)
print() # New line
# Execute FFmpeg as sub-process
# Map the video to stderr and map the data to stdout
process = sp.Popen(shlex.split('ffmpeg -hide_banner -loglevel quiet ' # Set loglevel to quiet for disabling the prints ot stderr
'-i "Day Flight.mpg" ' # Input video "Day Flight.mpg"
'-map 0:v -c:v rawvideo -pix_fmt bgr24 -f:v rawvideo pipe:2 ' # rawvideo format is mapped to stderr pipe (raw video codec with bgr24 pixel format)
'-map 0:d -c copy -copy_unknown -f:d data pipe:1 ' # Copy the data without ddecoding.
'-report'), # Create a log file (because we can't the statuses that are usually printed to stderr).
stdout=sp.PIPE, stderr=sp.PIPE)
# Start video reader thread (pass stderr pipe as argument).
video_thread = threading.Thread(target=video_reader, args=(process.stderr,))
video_thread.start()
# Start data reader thread (pass stdout pipe as argument).
data_thread = threading.Thread(target=data_reader, args=(process.stdout,))
data_thread.start()
# Wait for threads (and process) to finish.
video_thread.join()
data_thread.join()
process.wait()
以上代码将数据保存到data.bin
(用于测试)。
data.bin
可用于一致性检查。
执行 FFmpeg CLI 以提取数据流:
ffmpeg -y -i "Day Flight.mpg" -map 0:d -c copy -copy_unknown -f data raw.bin
验证 raw.bin
和 data.bin
文件是否相等。
我需要在 OpenCV/Python 中同时实时处理视频流和 klvdata 流。我正在使用 FFMPEG 读取文件或流,因为 OpenCV 不保留 klvdata。我使用子进程模块将数据传递给 OpenCV。
我的问题是我不知道如何将视频和 klvdata 同时映射到同一个子进程管道?
我的代码:
#!/usr/bin/env python3
import sys, json, klvdata;
from subprocess import PIPE
import subprocess as sp
import cv2
import numpy
command = ['ffmpeg',
'-i', 'DayFlight.mpg',
'-map', '0:0',
'-map', '0:d',
'-pix_fmt', 'bgr24',
'-c:v', 'rawvideo',
'-an','-sn',
'-f', 'image2pipe', '-',
'-c:d', 'copy',
'-f','data',
]
pipe = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=10**8)
while True:
raw_image = pipe.stdout.read(1280*720*3)
image = numpy.fromstring(raw_image, dtype='uint8')
image = image.reshape((720,1280,3))
if image is not None:
cv2.imshow('Video', image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
for packet in klvdata.StreamParser(pipe.stdout):
metadata = packet.MetadataList()
print(metadata)
pipe.stdout.flush()
cv2.destroyAllWindows()
产生以下错误:
Traceback (most recent call last):
File "test_cv.py", line 32, in <module>
metadata = packet.MetadataList()
AttributeError: 'UnknownElement' object has no attribute 'MetadataList'
非常感谢任何帮助。
为了分割视频和数据,我们可以将视频流映射到stderr
管道,将KLV数据流映射到stdout
管道。
我的 following answer.
中使用了相同的技术来分离视频和音频当每个视频帧都有私有KLV数据(按顺序同步)时,视频帧和相应数据之间的精确同步相对简单。
Day Flight.mpg示例文件的数据包比帧少得多,使用建议的解决方案无法准确同步(我认为使用管道方法不可能)。
我们仍然可以应用一些粗略的同步 - 假设数据和帧是在时间接近的情况下读取的。
建议的视频和数据分割方式:
-----------
--->| Raw Video | ---> stderr (pipe)
----------- ------------- | -----------
| Input | | FFmpeg | |
| Video with| ---> | sub-process | ---
| Data | | | |
----------- ------------- | -----------
--->| KLV data | ---> stdout (pipe)
-----------
视频和数据在两个单独的线程中读取:
- 视频 reader 线程 - 读取原始视频帧(BGR)格式。
- 数据reader 线程 - 读取和解析 KLV 数据。
根据Wikipedia,KLV格式定义不明确:
Keys can be 1, 2, 4, or 16 bytes in length.
Presumably in a separate specification document you would agree on a key length for a given application.
示例视频中,密钥长度为16字节,但不保证...
正在从标准输出管道读取 KLV 数据:
从管道读取数据时(以 real-time 之类的方式),我们需要知道要读取的预期字节数。
这迫使我们对 KLV 数据进行部分解析:
- 读取“密钥”(假设长度为 16 字节)。
- 阅读“长度”-“BER 长度”标准存在一些挑战。
- 读取“数据”(要读取的大小由长度定义)。
读取密钥、长度和数据后,我们得到一个“KLV数据包”,我们可以发送到KLV data parser。
这是与 Day Flight.mpg
示例输入文件一起工作的代码示例:
#!/usr/bin/env python3
import klvdata
import subprocess as sp
import shlex
import threading
import numpy as np
import cv2
from io import BytesIO
# Video reader thread.
def video_reader(pipe):
cols, rows = 1280, 720 # Assume we know frame size is 1280x720
counter = 0
while True:
raw_image = pipe.read(cols*rows*3) # Read raw video frame
# Break the loop when length is too small
if len(raw_image) < cols*rows*3:
break
if (counter % 60) == 0:
# Show video frame evey 60 frames
image = np.frombuffer(raw_image, np.uint8).reshape([rows, cols, 3])
cv2.imshow('Video', image) # Show video image for testing
cv2.waitKey(1)
counter += 1
# https://github.com/paretech/klvdata/tree/master/klvdata
def bytes_to_int(value, signed=False):
"""Return integer given bytes."""
return int.from_bytes(bytes(value), byteorder='big', signed=signed)
# Data reader thread (read KLV data).
def data_reader(pipe):
key_length = 16 # Assume key length is 16 bytes.
f = open('data.bin', 'wb') # For testing - store the KLV data to data.bin (binary file)
while True:
# https://en.wikipedia.org/wiki/KLV
# The first few bytes are the Key, much like a key in a standard hash table data structure.
# Keys can be 1, 2, 4, or 16 bytes in length.
# Presumably in a separate specification document you would agree on a key length for a given application.
key = pipe.read(key_length) # Read the key
if len(key) < key_length:
break # Break the loop when length is too small
f.write(key) # Write data to binary file for testing
# https://github.com/paretech/klvdata/tree/master/klvdata
# Length field
len_byte = pipe.read(1)
if len(len_byte) < 1:
break # Break the loop when length is too small
f.write(len_byte) # Write data to binary file for testing
byte_length = bytes_to_int(len_byte)
# https://github.com/paretech/klvdata/tree/master/klvdata
if byte_length < 128:
# BER Short Form
length = byte_length
ber_len_bytes = b''
else:
# BER Long Form
ber_len = byte_length - 128
ber_len_bytes = pipe.read(ber_len)
if len(ber_len_bytes) < ber_len:
break # Break the loop when length is too small
f.write(ber_len_bytes) # Write ber_len_bytes to binary file for testing
length = bytes_to_int(ber_len_bytes)
# Read the value (length bytes)
value = pipe.read(length)
if len(value) < length:
break # Break the loop when length is too small
f.write(value) # Write data to binary file for testing
klv_data = key + len_byte + ber_len_bytes + value # Concatenate key length and data
klv_data_as_bytes_io = BytesIO(klv_data) # Wrap klv_data with BytesIO (before parsing)
# Parse the KLV data
for packet in klvdata.StreamParser(klv_data_as_bytes_io):
metadata = packet.MetadataList()
print(metadata)
print() # New line
# Execute FFmpeg as sub-process
# Map the video to stderr and map the data to stdout
process = sp.Popen(shlex.split('ffmpeg -hide_banner -loglevel quiet ' # Set loglevel to quiet for disabling the prints ot stderr
'-i "Day Flight.mpg" ' # Input video "Day Flight.mpg"
'-map 0:v -c:v rawvideo -pix_fmt bgr24 -f:v rawvideo pipe:2 ' # rawvideo format is mapped to stderr pipe (raw video codec with bgr24 pixel format)
'-map 0:d -c copy -copy_unknown -f:d data pipe:1 ' # Copy the data without ddecoding.
'-report'), # Create a log file (because we can't the statuses that are usually printed to stderr).
stdout=sp.PIPE, stderr=sp.PIPE)
# Start video reader thread (pass stderr pipe as argument).
video_thread = threading.Thread(target=video_reader, args=(process.stderr,))
video_thread.start()
# Start data reader thread (pass stdout pipe as argument).
data_thread = threading.Thread(target=data_reader, args=(process.stdout,))
data_thread.start()
# Wait for threads (and process) to finish.
video_thread.join()
data_thread.join()
process.wait()
以上代码将数据保存到data.bin
(用于测试)。
data.bin
可用于一致性检查。
执行 FFmpeg CLI 以提取数据流:
ffmpeg -y -i "Day Flight.mpg" -map 0:d -c copy -copy_unknown -f data raw.bin
验证 raw.bin
和 data.bin
文件是否相等。