PyQt6 实现周期性后台任务的正确方法

PyQt6 proper way to implement a periodic background task

我正在从 Prometheus 数据库读取天气传感器数据,并每 30 秒更新一次显示天气数据的小部件。 “每 30 秒”是通过工作线程中的 time.sleep 完成的,我猜可能有更好的方法来做到这一点?例如,我试图在 MainWindow 上实现一个 closeEvent 方法,但它永远不会退出。使用 QRunnable 执行周期性任务的正确方法是什么?

from PyQt6 import QtGui
from PyQt6 import QtWidgets
from PyQt6 import QtCore
from sensor import GetPromSensor

import time

class WorkerSignals(QtCore.QObject):
    result = QtCore.pyqtSignal(float)

class Worker(QtCore.QRunnable):
    def __init__(self, prom_sensor):
        super(Worker, self).__init__()
        self.prom_sensor = prom_sensor
        self.signals = WorkerSignals()

    @QtCore.pyqtSlot()
    def run(self):
        while True:
            print("Thread start")
            self.signals.result.emit(float(self.prom_sensor.get_sensor_latest()))
            time.sleep(30)
            print("Thread complete")

class MainWindow(QtWidgets.QMainWindow):
    def __init__(self):
        super(MainWindow, self).__init__()
        self.threadpool = QtCore.QThreadPool()
        self.prom_sensor = GetPromSensor()

        self.previous = None

        layout = QtWidgets.QGridLayout()

        self.label = QtWidgets.QLabel("Start")
        self.label.setStyleSheet("background-color:rosybrown; border-radius:5px")
        self.label.setAlignment(QtCore.Qt.Alignment.AlignCenter)
        self.label.setFont(QtGui.QFont("Arial", 32))

        layout.addWidget(self.label)

        widget = QtWidgets.QWidget()
        widget.setLayout(layout)

        self.setCentralWidget(widget)

        self.show()

        self.start_worker()

    def start_worker(self):
        worker = Worker(prom_sensor = self.prom_sensor)
        worker.signals.result.connect(self.process_result)

        self.threadpool.start(worker)

    def process_result(self, temperature):
        if temperature == self.previous:
            return

        self.previous = temperature
        print("receiving:", temperature)
        self.label.setText(str(temperature))

if __name__ == '__main__':
    app = QtWidgets.QApplication([])
    window = MainWindow()
    app.exec()

您可以使用 QTimer 每 T 秒启动一个 worker,而不是使用 time.sleep,然后 运行 在另一个线程上执行繁重的读取任务。

from PyQt6 import QtGui
from PyQt6 import QtWidgets
from PyQt6 import QtCore

from sensor import GetPromSensor


class WorkerSignals(QtCore.QObject):
    result = QtCore.pyqtSignal(float)


class Worker(QtCore.QRunnable):
    def __init__(self, prom_sensor, signals):
        super(Worker, self).__init__()
        self.prom_sensor = prom_sensor
        self.signals = signals

    def run(self):
        value = self.prom_sensor.get_sensor_latest()
        self.signals.result.emit(float(value))


class MainWindow(QtWidgets.QMainWindow):
    def __init__(self):
        super(MainWindow, self).__init__()
        self.prom_sensor = GetPromSensor()
        
        self.threadpool = QtCore.QThreadPool()
        self.signals = WorkerSignals()
        self.signals.result.connect(self.process_result)

        self.previous = None

        widget = QtWidgets.QWidget()
        self.setCentralWidget(widget)
        layout = QtWidgets.QGridLayout(widget)
 
        self.label = QtWidgets.QLabel("Start")
        self.label.setStyleSheet("background-color:rosybrown; border-radius:5px")
        self.label.setAlignment(QtCore.Qt.Alignment.AlignCenter)
        self.label.setFont(QtGui.QFont("Arial", 32))

        layout.addWidget(self.label)

        self.request_data()

    def request_data(self):
        worker = Worker(prom_sensor=self.prom_sensor, signals=self.signals)
        self.threadpool.start(worker)
        QtCore.QTimer.singleShot(5 * 1000, self.request_data)

    @QtCore.pyqtSlot(float)
    def process_result(self, temperature):
        if temperature == self.previous:
            return

        self.previous = temperature
        print("receiving:", temperature)
        self.label.setNum(temperature)


if __name__ == "__main__":
    app = QtWidgets.QApplication([])
    window = MainWindow()
    window.show()
    app.exec()

注意:pyqtSlot 仅适用于 QObjects,而 QRunnable 则不行。