来自 GigE 相机的实时视频

Live video from GigE Cameras

我在 QML 中使用来自 2 个 GigE 摄像机的实时视频流时遇到问题。我之前用 QLabels 和 QPixmap 试过它,它没有任何问题。 QML 标签没有像素图 属性 无法使用信号槽发送图像。

这是我的 Python 代码:

import sys
import os
from PySide2.QtGui import QGuiApplication
from PySide2.QtQml import QQmlApplicationEngine
from PySide2.QtGui import QImage, QPixmap
from PySide2.QtCore import Slot, QThread, Signal, Qt, QObject
import cv2
from pypylon import pylon


tlFactory = pylon.TlFactory.GetInstance()
devices = tlFactory.EnumerateDevices()
if len(devices) == 0:
    raise pylon.RuntimeException("No camera present.")

cameras = pylon.InstantCameraArray(min(len(devices), 2))


for i, cam in enumerate(cameras):
    cam.Attach(tlFactory.CreateDevice(devices[i]))


class CamThread(QThread):
    cam1 = Signal(QImage)
    cam2 = Signal(QImage)

    def run(self):
        cameras.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)

        try:

            while cameras.IsGrabbing():
                grabResult1 = cameras[0].RetrieveResult(
                    5000, pylon.TimeoutHandling_ThrowException
                )
                grabResult2 = cameras[1].RetrieveResult(
                    5000, pylon.TimeoutHandling_ThrowException
                )

                if grabResult1.GrabSucceeded() and grabResult2.GrabSucceeded():
                    img1 = grabResult1.GetArray()
                    img2 = grabResult2.GetArray()
                    rgb1 = cv2.cvtColor(img1, cv2.COLOR_YUV2RGB_Y422)
                    rgb2 = cv2.cvtColor(img2, cv2.COLOR_YUV2RGB_Y422)

                    h1, w1, ch1 = rgb1.shape
                    h2, w2, ch2 = rgb2.shape

                    bytesPerLine1 = ch1 * w1
                    bytesPerLine2 = ch2 * w1
                    convertToQtFormat1 = QImage(
                        img1.data, w1, h1, bytesPerLine1, QImage.Format_RGB888
                    )
                    convertToQtFormat2 = QImage(
                        img2.data, w2, h2, bytesPerLine2, QImage.Format_RGB888
                    )

                    p = convertToQtFormat1.scaled(800, 746, Qt.KeepAspectRatio)
                    q = convertToQtFormat2.scaled(800, 746, Qt.KeepAspectRatio)

                    self.cam1.emit(p)
                    self.cam2.emit(q)

        except Exception as error:
            print(error)


class MainWindow(QObject):
    def __init__(self):
        QObject.__init__(self)
        self.CamThread = CamThread()
        self.CamThread.cam1.connect(self.camera1)
        self.CamThread.cam2.connect(self.camera2)
        self.CamThread.start()

    @Slot(QImage)
    def camera1(self, image):
        pass

    @Slot(QImage)
    def camera2(self, image):
        pass


if __name__ == "__main__":
    app = QGuiApplication(sys.argv)
    backend = MainWindow()
    engine = QQmlApplicationEngine()
    engine.rootContext().setContextProperty("backend", backend)
    engine.load(os.path.join(os.path.dirname(__file__), "main.qml"))

    if not engine.rootObjects():
        sys.exit(-1)
    sys.exit(app.exec_())

那么如何使用QML/PySide2显示直播视频流呢? 我正在使用 QT 设计工作室。

Qt 提供了不同的方法来将 images/video 流传递给 QML:

1。将 pixmap 转换为 base64 编码

QByteArray byteArray;
QBuffer buffer(&byteArray);
buffer.open(QIODevice::WriteOnly);
pixmap.save(&buffer,"PNG");
QString data("data:image/png;base64,");
data.append(QString::fromLatin1(byteArray.toBase64().data()));

此 base64 编码图像可能会传递给 Image::source

2。使用 QQuickImageProvider

这允许将自定义 image://... url 直接连接到 QPixmapQImage。查看文档以获取更多信息。

3。使用 QtMultimedia

特别是VideoOutput可能会有用。

虽然 QQuickImageProvider 选项可能是一个很好的选项,但缺点是您必须生成不同的 url,更好的选择是使用 VideoOutput,例如在您的情况下,以下实现应该有效(未测试):

from functools import cached_property
import os
import random
import sys
import threading

import cv2

from PySide2.QtCore import Property, QObject, Qt, QSize, QTimer, Signal, Slot
from PySide2.QtGui import QColor, QGuiApplication, QImage
from PySide2.QtMultimedia import QAbstractVideoSurface, QVideoFrame, QVideoSurfaceFormat
from PySide2.QtQml import QQmlApplicationEngine
import shiboken2

from pypylon import pylon


class CameraProvider(QObject):
    imageChanged = Signal(int, QImage)

    def start(self, cameras):
        threading.Thread(target=self._execute, args=(cameras,), daemon=True).start()

    def _execute(self, cameras):
        while cameras.IsGrabbing():
            for i, camera in enumerate(cameras):
                try:
                    grab_result = cameras[i].RetrieveResult(
                        5000, pylon.TimeoutHandling_ThrowException
                    )
                    if grab_result.GrabSucceeded():
                        img = grab_result.GetArray()
                        # FIXME
                        # convert img to qimage
                        qimage = QImage(800, 746, QImage.Format_RGB888)
                        qimage.fill(QColor(*random.sample(range(0, 255), 3)))
                        if shiboken2.isValid(self):
                            self.imageChanged.emit(i, qimage.copy())
                except Exception as error:
                    print(error)


class CameraService(QObject):
    surfaceChanged = Signal()

    def __init__(self, parent=None):
        super().__init__(parent)
        self._surface = None
        self._format = QVideoSurfaceFormat()
        self._format_is_valid = False

    def get_surface(self):
        return self._surface

    def set_surface(self, surface):
        if self._surface is surface:
            return
        if (
            self._surface is not None
            and self._surface is not surface
            and self._surface.isActive()
        ):
            self._surface.stop()
        self._surface = surface
        self.surfaceChanged.emit()

        if self._surface is not None:
            self._format = self._surface.nearestFormat(self._format)
            self._surface.start(self._format)

    videoSurface = Property(
        QAbstractVideoSurface,
        fget=get_surface,
        fset=set_surface,
        notify=surfaceChanged,
    )

    @Slot(QImage)
    def update_frame(self, qimage):
        if self.videoSurface is None or qimage.isNull():
            return
        if not self._format_is_valid:
            self._set_format(qimage.width(), qimage.height(), QVideoFrame.Format_RGB32)
            self._format_is_valid = True
        qimage.convertTo(
            QVideoFrame.imageFormatFromPixelFormat(QVideoFrame.Format_RGB32)
        )
        self._surface.present(QVideoFrame(qimage))

    def _set_format(self, width, height, pixel_format):
        size = QSize(width, height)
        video_format = QVideoSurfaceFormat(size, pixel_format)
        self._format = video_format
        if self._surface is not None:
            if self._surface.isActive():
                self._surface.stop()
            self._format = self._surface.nearestFormat(self._format)
            self._surface.start(self._format)


class CameraManager(QObject):
    def __init__(self, cameras, parent=None):
        super().__init__(parent)
        self._services = []
        self.provider.imageChanged.connect(self.handle_image_changed)
        self.provider.start(cameras)
        for _ in cameras:
            self._services.append(CameraService())

    @cached_property
    def provider(self):
        return CameraProvider()

    @Slot(int, QImage)
    def handle_image_changed(self, index, qimage):
        self._services[index].update_frame(qimage)

    def get_services(self):
        return self._services

    services = Property("QVariantList", fget=get_services, constant=True)


def main():
    app = QGuiApplication(sys.argv)

    tlFactory = pylon.TlFactory.GetInstance()
    devices = tlFactory.EnumerateDevices()
    if len(devices) == 0:
        raise pylon.RuntimeException("No camera present.")

    cameras = pylon.InstantCameraArray(min(len(devices), 2))

    for i, cam in enumerate(cameras):
        cam.Attach(tlFactory.CreateDevice(devices[i]))

    manager = CameraManager(cameras)

    engine = QQmlApplicationEngine()
    engine.rootContext().setContextProperty("manager", manager)
    engine.load(os.path.join(os.path.dirname(__file__), "main.qml"))
    if not engine.rootObjects():
        sys.exit(-1)

    sys.exit(app.exec_())


if __name__ == "__main__":
    main()
import QtQuick 2.14
import QtQuick.Window 2.14
import QtMultimedia 5.14

Window {
    visible: true
    width: 640
    height: 480
    title: qsTr("Hello World")

    GridView {
        width: 300; height: 200

        model: manager !== null ? manager.services : []
        delegate: VideoOutput {
            width: 100
            height: 100
            fillMode: VideoOutput.PreserveAspectCrop
            source: model.modelData
        }
    }
}