使用 PyQt5 和 pyudev 为 'usb device inserted' 事件通知 QML

Notify QML for 'usb device inserted' events using PyQt5 and pyudev

我有一个 GUI 应用程序(使用 PyQt5 和 QML 制作)并且希望在 USB 设备从计算机插入或拔出时得到通知。经过一些调查,我发现 pyudev 可能是要使用的库。但是我在将它与 PyQt5 和 QML 一起使用时遇到了问题。 pyudev example for MonitorObservor, and there are other example provided in the documentation, here with PySide and here with Glib. I have also found an example using PyQt5 and widgets application 我已经成功使用了。但是我在我的 PyQt5 QML 应用程序上实现它时遇到了麻烦。我确信这很容易,所以我想我只是遗漏了一些东西,但我找不到什么...

这是我目前的情况:

import sys
from PyQt5.QtWidgets import QApplication
from PyQt5.QtQml import QQmlApplicationEngine
from PyQt5.QtCore import QUrl
from pyudev import Context, Monitor, Device
from pyudev.pyqt5 import MonitorObserver
from Passerelle import *

# def device_connected(self, device):
def device_connected(self):
    print("Test")
    print("device action: ", device.action, ", path: ", device.device_path)
if __name__ == "__main__":
    app = QApplication(sys.argv)
    engine = QQmlApplicationEngine()
    p = Passerelle()
    engine.rootContext().setContextProperty("passerelle", p)
    engine.load(QUrl("main.qml"))
    if not engine.rootObjects:
        sys.exit(-1)

    context = Context()
    monitor = Monitor.from_netlink(context)
    # monitor.filter_by(subsystem='tty')
    observer = MonitorObserver(monitor)
    observer.deviceEvent.connect(device_connected)
    monitor.start()
    ret = app.exec_()
    sys.exit(ret)

拔出或插回设备时,我已成功在控制台上打印 "Test",但似乎无法打印设备信息(TypeError: device_connected() missing 1 required positional argument: 'device' 当我取消注释 def device_connected(self, device): 时) .

这里第一步是能够在控制台打印设备信息,然后想办法通知GUI,最后只有当设备插拔有指定的时候才通知GUI VID/PID.

编辑:我找到了一种使用 vid = device.get('ID_VENDOR_ID')pid = device.get('ID_MODEL_ID')

通过 VID PID 识别设备的方法

对于第二步,我曾想过使用 Passerelle class 作为 QML 后端:

from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal#, pyqtProperty, QUrl
from pyudev import Context, Monitor
from pyudev.pyqt5 import MonitorObserver

def device_event(observer, device):
    print ("event ", device.action, " on device ", device)
class Passerelle(QObject):
    sendDeviceEvent = pyqtSignal(int)
    def __init__(self, parent=None):
        print("Passerelle constructor called")
        QObject.__init__(self, parent)
        print("end Passerelle constructor")
    @pyqtSlot()
    def setObserverForDeviceEvents(self):
        print("setObserverForDeviceEvents called")
        context = Context()
        monitor = Monitor.from_netlink(context)
        monitor.filter_by(subsystem='usb')
        observer = MonitorObserver(monitor)
        observer.deviceEvent.connect(self.device_connected)
        monitor.start()
        print("end setObserverForDeviceEvents")
    def device_connected(self, device):
        print("Test")
        print("device action: ", device.action, ", path: ", device.device_path)

但我不确定这是个好主意,因为我读到 需要在进入 qt 的主循环之前启动监视器。我的理解是:监视器应该在调用 app.exec_()...

之前在 main.py 中启动

提前感谢您的帮助!

最好的办法是修改 QML 中的 GUI,为此必须可以从 QML 访问 Monitor 和 Device 对象。只有 QObjects 接收通知,所以我将创建 2 类,使用 q-properties 和 slots 将光层包裹到 类。

pyqtudev.py

from PyQt5 import QtCore
from pyudev import Context, Monitor, Device
from pyudev.pyqt5 import MonitorObserver

class QtUdevDevice(QtCore.QObject):
    def __init__(self, parent=None):
        super(QtUdevDevice, self).__init__(parent)
        self.m_dev = None

    def initialize(self, dev):
        self.m_dev = dev

    @QtCore.pyqtSlot(result=bool)
    def isValid(self):
        return self.m_dev is not None

    @QtCore.pyqtProperty(str, constant=True)
    def devType(self):
        if not self.isValid():
            return ""
        if self.m_dev.device_type is None:
            return ""
        return self.m_dev.device_type

    @QtCore.pyqtProperty(str, constant=True)
    def subsystem(self):
        if not self.isValid():
            return ""
        return self.m_dev.subsystem

    @QtCore.pyqtProperty(str, constant=True)
    def name(self):
        if not self.isValid():
            return ""
        return self.m_dev.sys_name

    @QtCore.pyqtProperty(str, constant=True)
    def driver(self):
        if not self.isValid():
            return ""
        if self.m_dev.driver is None:
            return ""
        return self.m_dev.driver

    @QtCore.pyqtProperty(str, constant=True)
    def deviceNode(self):
        if not self.isValid():
            return ""
        if self.m_dev.device_node is None:
            return ""
        return self.m_dev.device_node

    @QtCore.pyqtProperty(list, constant=True)
    def alternateDeviceSymlinks(self):
        return list(self.m_dev.device_links)

    @QtCore.pyqtProperty(str, constant=True)
    def sysfsPath(self):
        if not self.isValid():
            return ""
        return self.m_dev.sys_path

    @QtCore.pyqtProperty(int, constant=True)
    def sysfsNumber(self):
        if not self.isValid():
            return -1
        if self.m_dev.sys_number is None:
            return -1
        return int(self.m_dev.sys_number)

    @QtCore.pyqtSlot(str, result=str)
    def property(self, name):
        if not self.isValid():
            return ""
        v = self.m_dev.properties.get(name)
        return v if v is not None else ""

    @QtCore.pyqtSlot(str, result=bool)
    def hasProperty(self, name):
        if not self.isValid():
            return False
        return self.m_dev.properties.get(name) is not None

    @QtCore.pyqtProperty(list, constant=True)
    def deviceProperties(self):
        if not self.isValid():
            return []
        return list(self.m_dev.properties)

    @QtCore.pyqtProperty(list, constant=True)
    def sysfsProperties(self):
        if not self.isValid():
            return []
        return list(self.m_dev.attributes.available_attributes)

    @QtCore.pyqtProperty(QtCore.QObject, constant=True)
    def parentDevice(self):
        if not self.isValid:
            return 
        if self.m_dev.parent:
            parent_device = QtUdevDevice()
            parent_device.initialize(self.m_dev.parent)
            return parent_device

    @QtCore.pyqtProperty(str, constant=True)
    def action(self):
        if not self.isValid():
            return ""
        if self.m_dev.action is None:
            return ""
        return self.m_dev.action

    def __repr__(self):
        if self.isValid():
            return "UdevDevice({})".format(self.sysfsPath())
        return "Invalid UdevDevice"

class QtMonitorObserver(QtCore.QObject):
    deviceEvent = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceAdded = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceRemoved = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceChanged = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceOnlined = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceOfflined = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])

    def __init__(self, parent=None):
        super(QtMonitorObserver, self).__init__(parent)
        context = Context()
        self._monitor = Monitor.from_netlink(context)
        self._observer = MonitorObserver(self._monitor, self)
        self._observer.deviceEvent.connect(self.setup_new_signals)

    @QtCore.pyqtSlot()
    def start(self):
        self._monitor.start()

    @QtCore.pyqtSlot(str)
    def filter_by(self, filter):
        self._monitor.filter_by(subsystem=filter)

    @QtCore.pyqtSlot(str)
    def filter_by_tag(self, tag):
        self._monitor.filter_by_tag(tag)

    @QtCore.pyqtSlot()
    def remove_filter(self):
        self._monitor.remove_filter()

    @QtCore.pyqtSlot(Device)
    def setup_new_signals(self, device):
        new_signals_map = {
            'add': self.deviceAdded,
            'remove': self.deviceRemoved,
            'change': self.deviceChanged,
            'online': self.deviceOnlined,
            'offline': self.deviceOfflined,
        }
        signal = new_signals_map.get(device.action)
        qtdevice = QtUdevDevice()
        qtdevice.initialize(device)
        if signal:
            signal.emit(qtdevice)
        self.deviceEvent.emit(qtdevice)

main.py

import os
import sys
from PyQt5 import QtCore, QtGui, QtQml

from pyqtudev import QtMonitorObserver

def run():
    app = QtGui.QGuiApplication(sys.argv)
    engine = QtQml.QQmlApplicationEngine()
    observer = QtMonitorObserver()
    engine.rootContext().setContextProperty("observer", observer)
    directory = os.path.dirname(os.path.abspath(__file__))
    engine.load(QtCore.QUrl.fromLocalFile(os.path.join(directory, 'main.qml')))
    if not engine.rootObjects():
        return -1
    return app.exec_()

if __name__ == "__main__":
    sys.exit(run())

main.qml

import QtQuick 2.11
import QtQuick.Window 2.2
import QtQuick.Controls 2.2

ApplicationWindow {    
    visible: true
    width: Screen.width/2
    height: Screen.height/2
    Connections {
        target: observer
        onDeviceEvent: {
            console.log(device, device.name, device.action, device.parentDevice)
            if(device.hasProperty("ID_VENDOR_ID")){
                console.log(device.property("ID_VENDOR_ID"))
            }
        }
    }
    Component.onCompleted: {
        observer.start()
        observer.filter_by("usb")
    } 
}