来自 GigE 相机的实时视频
Live video from GigE Cameras
我在 QML 中使用来自 2 个 GigE 摄像机的实时视频流时遇到问题。我之前用 QLabels 和 QPixmap 试过它,它没有任何问题。 QML 标签没有像素图 属性 无法使用信号槽发送图像。
这是我的 Python 代码:
import sys
import os
from PySide2.QtGui import QGuiApplication
from PySide2.QtQml import QQmlApplicationEngine
from PySide2.QtGui import QImage, QPixmap
from PySide2.QtCore import Slot, QThread, Signal, Qt, QObject
import cv2
from pypylon import pylon
tlFactory = pylon.TlFactory.GetInstance()
devices = tlFactory.EnumerateDevices()
if len(devices) == 0:
raise pylon.RuntimeException("No camera present.")
cameras = pylon.InstantCameraArray(min(len(devices), 2))
for i, cam in enumerate(cameras):
cam.Attach(tlFactory.CreateDevice(devices[i]))
class CamThread(QThread):
cam1 = Signal(QImage)
cam2 = Signal(QImage)
def run(self):
cameras.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
try:
while cameras.IsGrabbing():
grabResult1 = cameras[0].RetrieveResult(
5000, pylon.TimeoutHandling_ThrowException
)
grabResult2 = cameras[1].RetrieveResult(
5000, pylon.TimeoutHandling_ThrowException
)
if grabResult1.GrabSucceeded() and grabResult2.GrabSucceeded():
img1 = grabResult1.GetArray()
img2 = grabResult2.GetArray()
rgb1 = cv2.cvtColor(img1, cv2.COLOR_YUV2RGB_Y422)
rgb2 = cv2.cvtColor(img2, cv2.COLOR_YUV2RGB_Y422)
h1, w1, ch1 = rgb1.shape
h2, w2, ch2 = rgb2.shape
bytesPerLine1 = ch1 * w1
bytesPerLine2 = ch2 * w1
convertToQtFormat1 = QImage(
img1.data, w1, h1, bytesPerLine1, QImage.Format_RGB888
)
convertToQtFormat2 = QImage(
img2.data, w2, h2, bytesPerLine2, QImage.Format_RGB888
)
p = convertToQtFormat1.scaled(800, 746, Qt.KeepAspectRatio)
q = convertToQtFormat2.scaled(800, 746, Qt.KeepAspectRatio)
self.cam1.emit(p)
self.cam2.emit(q)
except Exception as error:
print(error)
class MainWindow(QObject):
def __init__(self):
QObject.__init__(self)
self.CamThread = CamThread()
self.CamThread.cam1.connect(self.camera1)
self.CamThread.cam2.connect(self.camera2)
self.CamThread.start()
@Slot(QImage)
def camera1(self, image):
pass
@Slot(QImage)
def camera2(self, image):
pass
if __name__ == "__main__":
app = QGuiApplication(sys.argv)
backend = MainWindow()
engine = QQmlApplicationEngine()
engine.rootContext().setContextProperty("backend", backend)
engine.load(os.path.join(os.path.dirname(__file__), "main.qml"))
if not engine.rootObjects():
sys.exit(-1)
sys.exit(app.exec_())
那么如何使用QML/PySide2显示直播视频流呢?
我正在使用 QT 设计工作室。
Qt 提供了不同的方法来将 images/video 流传递给 QML:
1。将 pixmap 转换为 base64 编码
QByteArray byteArray;
QBuffer buffer(&byteArray);
buffer.open(QIODevice::WriteOnly);
pixmap.save(&buffer,"PNG");
QString data("data:image/png;base64,");
data.append(QString::fromLatin1(byteArray.toBase64().data()));
此 base64 编码图像可能会传递给 Image::source
2。使用 QQuickImageProvider
这允许将自定义 image://...
url 直接连接到 QPixmap
或 QImage
。查看文档以获取更多信息。
3。使用 QtMultimedia
特别是VideoOutput
可能会有用。
虽然 QQuickImageProvider 选项可能是一个很好的选项,但缺点是您必须生成不同的 url,更好的选择是使用 VideoOutput,例如在您的情况下,以下实现应该有效(未测试):
from functools import cached_property
import os
import random
import sys
import threading
import cv2
from PySide2.QtCore import Property, QObject, Qt, QSize, QTimer, Signal, Slot
from PySide2.QtGui import QColor, QGuiApplication, QImage
from PySide2.QtMultimedia import QAbstractVideoSurface, QVideoFrame, QVideoSurfaceFormat
from PySide2.QtQml import QQmlApplicationEngine
import shiboken2
from pypylon import pylon
class CameraProvider(QObject):
imageChanged = Signal(int, QImage)
def start(self, cameras):
threading.Thread(target=self._execute, args=(cameras,), daemon=True).start()
def _execute(self, cameras):
while cameras.IsGrabbing():
for i, camera in enumerate(cameras):
try:
grab_result = cameras[i].RetrieveResult(
5000, pylon.TimeoutHandling_ThrowException
)
if grab_result.GrabSucceeded():
img = grab_result.GetArray()
# FIXME
# convert img to qimage
qimage = QImage(800, 746, QImage.Format_RGB888)
qimage.fill(QColor(*random.sample(range(0, 255), 3)))
if shiboken2.isValid(self):
self.imageChanged.emit(i, qimage.copy())
except Exception as error:
print(error)
class CameraService(QObject):
surfaceChanged = Signal()
def __init__(self, parent=None):
super().__init__(parent)
self._surface = None
self._format = QVideoSurfaceFormat()
self._format_is_valid = False
def get_surface(self):
return self._surface
def set_surface(self, surface):
if self._surface is surface:
return
if (
self._surface is not None
and self._surface is not surface
and self._surface.isActive()
):
self._surface.stop()
self._surface = surface
self.surfaceChanged.emit()
if self._surface is not None:
self._format = self._surface.nearestFormat(self._format)
self._surface.start(self._format)
videoSurface = Property(
QAbstractVideoSurface,
fget=get_surface,
fset=set_surface,
notify=surfaceChanged,
)
@Slot(QImage)
def update_frame(self, qimage):
if self.videoSurface is None or qimage.isNull():
return
if not self._format_is_valid:
self._set_format(qimage.width(), qimage.height(), QVideoFrame.Format_RGB32)
self._format_is_valid = True
qimage.convertTo(
QVideoFrame.imageFormatFromPixelFormat(QVideoFrame.Format_RGB32)
)
self._surface.present(QVideoFrame(qimage))
def _set_format(self, width, height, pixel_format):
size = QSize(width, height)
video_format = QVideoSurfaceFormat(size, pixel_format)
self._format = video_format
if self._surface is not None:
if self._surface.isActive():
self._surface.stop()
self._format = self._surface.nearestFormat(self._format)
self._surface.start(self._format)
class CameraManager(QObject):
def __init__(self, cameras, parent=None):
super().__init__(parent)
self._services = []
self.provider.imageChanged.connect(self.handle_image_changed)
self.provider.start(cameras)
for _ in cameras:
self._services.append(CameraService())
@cached_property
def provider(self):
return CameraProvider()
@Slot(int, QImage)
def handle_image_changed(self, index, qimage):
self._services[index].update_frame(qimage)
def get_services(self):
return self._services
services = Property("QVariantList", fget=get_services, constant=True)
def main():
app = QGuiApplication(sys.argv)
tlFactory = pylon.TlFactory.GetInstance()
devices = tlFactory.EnumerateDevices()
if len(devices) == 0:
raise pylon.RuntimeException("No camera present.")
cameras = pylon.InstantCameraArray(min(len(devices), 2))
for i, cam in enumerate(cameras):
cam.Attach(tlFactory.CreateDevice(devices[i]))
manager = CameraManager(cameras)
engine = QQmlApplicationEngine()
engine.rootContext().setContextProperty("manager", manager)
engine.load(os.path.join(os.path.dirname(__file__), "main.qml"))
if not engine.rootObjects():
sys.exit(-1)
sys.exit(app.exec_())
if __name__ == "__main__":
main()
import QtQuick 2.14
import QtQuick.Window 2.14
import QtMultimedia 5.14
Window {
visible: true
width: 640
height: 480
title: qsTr("Hello World")
GridView {
width: 300; height: 200
model: manager !== null ? manager.services : []
delegate: VideoOutput {
width: 100
height: 100
fillMode: VideoOutput.PreserveAspectCrop
source: model.modelData
}
}
}
我在 QML 中使用来自 2 个 GigE 摄像机的实时视频流时遇到问题。我之前用 QLabels 和 QPixmap 试过它,它没有任何问题。 QML 标签没有像素图 属性 无法使用信号槽发送图像。
这是我的 Python 代码:
import sys
import os
from PySide2.QtGui import QGuiApplication
from PySide2.QtQml import QQmlApplicationEngine
from PySide2.QtGui import QImage, QPixmap
from PySide2.QtCore import Slot, QThread, Signal, Qt, QObject
import cv2
from pypylon import pylon
tlFactory = pylon.TlFactory.GetInstance()
devices = tlFactory.EnumerateDevices()
if len(devices) == 0:
raise pylon.RuntimeException("No camera present.")
cameras = pylon.InstantCameraArray(min(len(devices), 2))
for i, cam in enumerate(cameras):
cam.Attach(tlFactory.CreateDevice(devices[i]))
class CamThread(QThread):
cam1 = Signal(QImage)
cam2 = Signal(QImage)
def run(self):
cameras.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
try:
while cameras.IsGrabbing():
grabResult1 = cameras[0].RetrieveResult(
5000, pylon.TimeoutHandling_ThrowException
)
grabResult2 = cameras[1].RetrieveResult(
5000, pylon.TimeoutHandling_ThrowException
)
if grabResult1.GrabSucceeded() and grabResult2.GrabSucceeded():
img1 = grabResult1.GetArray()
img2 = grabResult2.GetArray()
rgb1 = cv2.cvtColor(img1, cv2.COLOR_YUV2RGB_Y422)
rgb2 = cv2.cvtColor(img2, cv2.COLOR_YUV2RGB_Y422)
h1, w1, ch1 = rgb1.shape
h2, w2, ch2 = rgb2.shape
bytesPerLine1 = ch1 * w1
bytesPerLine2 = ch2 * w1
convertToQtFormat1 = QImage(
img1.data, w1, h1, bytesPerLine1, QImage.Format_RGB888
)
convertToQtFormat2 = QImage(
img2.data, w2, h2, bytesPerLine2, QImage.Format_RGB888
)
p = convertToQtFormat1.scaled(800, 746, Qt.KeepAspectRatio)
q = convertToQtFormat2.scaled(800, 746, Qt.KeepAspectRatio)
self.cam1.emit(p)
self.cam2.emit(q)
except Exception as error:
print(error)
class MainWindow(QObject):
def __init__(self):
QObject.__init__(self)
self.CamThread = CamThread()
self.CamThread.cam1.connect(self.camera1)
self.CamThread.cam2.connect(self.camera2)
self.CamThread.start()
@Slot(QImage)
def camera1(self, image):
pass
@Slot(QImage)
def camera2(self, image):
pass
if __name__ == "__main__":
app = QGuiApplication(sys.argv)
backend = MainWindow()
engine = QQmlApplicationEngine()
engine.rootContext().setContextProperty("backend", backend)
engine.load(os.path.join(os.path.dirname(__file__), "main.qml"))
if not engine.rootObjects():
sys.exit(-1)
sys.exit(app.exec_())
那么如何使用QML/PySide2显示直播视频流呢? 我正在使用 QT 设计工作室。
Qt 提供了不同的方法来将 images/video 流传递给 QML:
1。将 pixmap 转换为 base64 编码
QByteArray byteArray;
QBuffer buffer(&byteArray);
buffer.open(QIODevice::WriteOnly);
pixmap.save(&buffer,"PNG");
QString data("data:image/png;base64,");
data.append(QString::fromLatin1(byteArray.toBase64().data()));
此 base64 编码图像可能会传递给 Image::source
2。使用 QQuickImageProvider
这允许将自定义 image://...
url 直接连接到 QPixmap
或 QImage
。查看文档以获取更多信息。
3。使用 QtMultimedia
特别是VideoOutput
可能会有用。
虽然 QQuickImageProvider 选项可能是一个很好的选项,但缺点是您必须生成不同的 url,更好的选择是使用 VideoOutput,例如在您的情况下,以下实现应该有效(未测试):
from functools import cached_property
import os
import random
import sys
import threading
import cv2
from PySide2.QtCore import Property, QObject, Qt, QSize, QTimer, Signal, Slot
from PySide2.QtGui import QColor, QGuiApplication, QImage
from PySide2.QtMultimedia import QAbstractVideoSurface, QVideoFrame, QVideoSurfaceFormat
from PySide2.QtQml import QQmlApplicationEngine
import shiboken2
from pypylon import pylon
class CameraProvider(QObject):
imageChanged = Signal(int, QImage)
def start(self, cameras):
threading.Thread(target=self._execute, args=(cameras,), daemon=True).start()
def _execute(self, cameras):
while cameras.IsGrabbing():
for i, camera in enumerate(cameras):
try:
grab_result = cameras[i].RetrieveResult(
5000, pylon.TimeoutHandling_ThrowException
)
if grab_result.GrabSucceeded():
img = grab_result.GetArray()
# FIXME
# convert img to qimage
qimage = QImage(800, 746, QImage.Format_RGB888)
qimage.fill(QColor(*random.sample(range(0, 255), 3)))
if shiboken2.isValid(self):
self.imageChanged.emit(i, qimage.copy())
except Exception as error:
print(error)
class CameraService(QObject):
surfaceChanged = Signal()
def __init__(self, parent=None):
super().__init__(parent)
self._surface = None
self._format = QVideoSurfaceFormat()
self._format_is_valid = False
def get_surface(self):
return self._surface
def set_surface(self, surface):
if self._surface is surface:
return
if (
self._surface is not None
and self._surface is not surface
and self._surface.isActive()
):
self._surface.stop()
self._surface = surface
self.surfaceChanged.emit()
if self._surface is not None:
self._format = self._surface.nearestFormat(self._format)
self._surface.start(self._format)
videoSurface = Property(
QAbstractVideoSurface,
fget=get_surface,
fset=set_surface,
notify=surfaceChanged,
)
@Slot(QImage)
def update_frame(self, qimage):
if self.videoSurface is None or qimage.isNull():
return
if not self._format_is_valid:
self._set_format(qimage.width(), qimage.height(), QVideoFrame.Format_RGB32)
self._format_is_valid = True
qimage.convertTo(
QVideoFrame.imageFormatFromPixelFormat(QVideoFrame.Format_RGB32)
)
self._surface.present(QVideoFrame(qimage))
def _set_format(self, width, height, pixel_format):
size = QSize(width, height)
video_format = QVideoSurfaceFormat(size, pixel_format)
self._format = video_format
if self._surface is not None:
if self._surface.isActive():
self._surface.stop()
self._format = self._surface.nearestFormat(self._format)
self._surface.start(self._format)
class CameraManager(QObject):
def __init__(self, cameras, parent=None):
super().__init__(parent)
self._services = []
self.provider.imageChanged.connect(self.handle_image_changed)
self.provider.start(cameras)
for _ in cameras:
self._services.append(CameraService())
@cached_property
def provider(self):
return CameraProvider()
@Slot(int, QImage)
def handle_image_changed(self, index, qimage):
self._services[index].update_frame(qimage)
def get_services(self):
return self._services
services = Property("QVariantList", fget=get_services, constant=True)
def main():
app = QGuiApplication(sys.argv)
tlFactory = pylon.TlFactory.GetInstance()
devices = tlFactory.EnumerateDevices()
if len(devices) == 0:
raise pylon.RuntimeException("No camera present.")
cameras = pylon.InstantCameraArray(min(len(devices), 2))
for i, cam in enumerate(cameras):
cam.Attach(tlFactory.CreateDevice(devices[i]))
manager = CameraManager(cameras)
engine = QQmlApplicationEngine()
engine.rootContext().setContextProperty("manager", manager)
engine.load(os.path.join(os.path.dirname(__file__), "main.qml"))
if not engine.rootObjects():
sys.exit(-1)
sys.exit(app.exec_())
if __name__ == "__main__":
main()
import QtQuick 2.14
import QtQuick.Window 2.14
import QtMultimedia 5.14
Window {
visible: true
width: 640
height: 480
title: qsTr("Hello World")
GridView {
width: 300; height: 200
model: manager !== null ? manager.services : []
delegate: VideoOutput {
width: 100
height: 100
fillMode: VideoOutput.PreserveAspectCrop
source: model.modelData
}
}
}