PyQt6 实现周期性后台任务的正确方法
PyQt6 proper way to implement a periodic background task
我正在从 Prometheus 数据库读取天气传感器数据,并每 30 秒更新一次显示天气数据的小部件。 “每 30 秒”是通过工作线程中的 time.sleep 完成的,我猜可能有更好的方法来做到这一点?例如,我试图在 MainWindow 上实现一个 closeEvent 方法,但它永远不会退出。使用 QRunnable 执行周期性任务的正确方法是什么?
from PyQt6 import QtGui
from PyQt6 import QtWidgets
from PyQt6 import QtCore
from sensor import GetPromSensor
import time
class WorkerSignals(QtCore.QObject):
result = QtCore.pyqtSignal(float)
class Worker(QtCore.QRunnable):
def __init__(self, prom_sensor):
super(Worker, self).__init__()
self.prom_sensor = prom_sensor
self.signals = WorkerSignals()
@QtCore.pyqtSlot()
def run(self):
while True:
print("Thread start")
self.signals.result.emit(float(self.prom_sensor.get_sensor_latest()))
time.sleep(30)
print("Thread complete")
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.threadpool = QtCore.QThreadPool()
self.prom_sensor = GetPromSensor()
self.previous = None
layout = QtWidgets.QGridLayout()
self.label = QtWidgets.QLabel("Start")
self.label.setStyleSheet("background-color:rosybrown; border-radius:5px")
self.label.setAlignment(QtCore.Qt.Alignment.AlignCenter)
self.label.setFont(QtGui.QFont("Arial", 32))
layout.addWidget(self.label)
widget = QtWidgets.QWidget()
widget.setLayout(layout)
self.setCentralWidget(widget)
self.show()
self.start_worker()
def start_worker(self):
worker = Worker(prom_sensor = self.prom_sensor)
worker.signals.result.connect(self.process_result)
self.threadpool.start(worker)
def process_result(self, temperature):
if temperature == self.previous:
return
self.previous = temperature
print("receiving:", temperature)
self.label.setText(str(temperature))
if __name__ == '__main__':
app = QtWidgets.QApplication([])
window = MainWindow()
app.exec()
您可以使用 QTimer 每 T 秒启动一个 worker,而不是使用 time.sleep,然后 运行 在另一个线程上执行繁重的读取任务。
from PyQt6 import QtGui
from PyQt6 import QtWidgets
from PyQt6 import QtCore
from sensor import GetPromSensor
class WorkerSignals(QtCore.QObject):
result = QtCore.pyqtSignal(float)
class Worker(QtCore.QRunnable):
def __init__(self, prom_sensor, signals):
super(Worker, self).__init__()
self.prom_sensor = prom_sensor
self.signals = signals
def run(self):
value = self.prom_sensor.get_sensor_latest()
self.signals.result.emit(float(value))
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.prom_sensor = GetPromSensor()
self.threadpool = QtCore.QThreadPool()
self.signals = WorkerSignals()
self.signals.result.connect(self.process_result)
self.previous = None
widget = QtWidgets.QWidget()
self.setCentralWidget(widget)
layout = QtWidgets.QGridLayout(widget)
self.label = QtWidgets.QLabel("Start")
self.label.setStyleSheet("background-color:rosybrown; border-radius:5px")
self.label.setAlignment(QtCore.Qt.Alignment.AlignCenter)
self.label.setFont(QtGui.QFont("Arial", 32))
layout.addWidget(self.label)
self.request_data()
def request_data(self):
worker = Worker(prom_sensor=self.prom_sensor, signals=self.signals)
self.threadpool.start(worker)
QtCore.QTimer.singleShot(5 * 1000, self.request_data)
@QtCore.pyqtSlot(float)
def process_result(self, temperature):
if temperature == self.previous:
return
self.previous = temperature
print("receiving:", temperature)
self.label.setNum(temperature)
if __name__ == "__main__":
app = QtWidgets.QApplication([])
window = MainWindow()
window.show()
app.exec()
注意:pyqtSlot 仅适用于 QObjects,而 QRunnable 则不行。
我正在从 Prometheus 数据库读取天气传感器数据,并每 30 秒更新一次显示天气数据的小部件。 “每 30 秒”是通过工作线程中的 time.sleep 完成的,我猜可能有更好的方法来做到这一点?例如,我试图在 MainWindow 上实现一个 closeEvent 方法,但它永远不会退出。使用 QRunnable 执行周期性任务的正确方法是什么?
from PyQt6 import QtGui
from PyQt6 import QtWidgets
from PyQt6 import QtCore
from sensor import GetPromSensor
import time
class WorkerSignals(QtCore.QObject):
result = QtCore.pyqtSignal(float)
class Worker(QtCore.QRunnable):
def __init__(self, prom_sensor):
super(Worker, self).__init__()
self.prom_sensor = prom_sensor
self.signals = WorkerSignals()
@QtCore.pyqtSlot()
def run(self):
while True:
print("Thread start")
self.signals.result.emit(float(self.prom_sensor.get_sensor_latest()))
time.sleep(30)
print("Thread complete")
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.threadpool = QtCore.QThreadPool()
self.prom_sensor = GetPromSensor()
self.previous = None
layout = QtWidgets.QGridLayout()
self.label = QtWidgets.QLabel("Start")
self.label.setStyleSheet("background-color:rosybrown; border-radius:5px")
self.label.setAlignment(QtCore.Qt.Alignment.AlignCenter)
self.label.setFont(QtGui.QFont("Arial", 32))
layout.addWidget(self.label)
widget = QtWidgets.QWidget()
widget.setLayout(layout)
self.setCentralWidget(widget)
self.show()
self.start_worker()
def start_worker(self):
worker = Worker(prom_sensor = self.prom_sensor)
worker.signals.result.connect(self.process_result)
self.threadpool.start(worker)
def process_result(self, temperature):
if temperature == self.previous:
return
self.previous = temperature
print("receiving:", temperature)
self.label.setText(str(temperature))
if __name__ == '__main__':
app = QtWidgets.QApplication([])
window = MainWindow()
app.exec()
您可以使用 QTimer 每 T 秒启动一个 worker,而不是使用 time.sleep,然后 运行 在另一个线程上执行繁重的读取任务。
from PyQt6 import QtGui
from PyQt6 import QtWidgets
from PyQt6 import QtCore
from sensor import GetPromSensor
class WorkerSignals(QtCore.QObject):
result = QtCore.pyqtSignal(float)
class Worker(QtCore.QRunnable):
def __init__(self, prom_sensor, signals):
super(Worker, self).__init__()
self.prom_sensor = prom_sensor
self.signals = signals
def run(self):
value = self.prom_sensor.get_sensor_latest()
self.signals.result.emit(float(value))
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.prom_sensor = GetPromSensor()
self.threadpool = QtCore.QThreadPool()
self.signals = WorkerSignals()
self.signals.result.connect(self.process_result)
self.previous = None
widget = QtWidgets.QWidget()
self.setCentralWidget(widget)
layout = QtWidgets.QGridLayout(widget)
self.label = QtWidgets.QLabel("Start")
self.label.setStyleSheet("background-color:rosybrown; border-radius:5px")
self.label.setAlignment(QtCore.Qt.Alignment.AlignCenter)
self.label.setFont(QtGui.QFont("Arial", 32))
layout.addWidget(self.label)
self.request_data()
def request_data(self):
worker = Worker(prom_sensor=self.prom_sensor, signals=self.signals)
self.threadpool.start(worker)
QtCore.QTimer.singleShot(5 * 1000, self.request_data)
@QtCore.pyqtSlot(float)
def process_result(self, temperature):
if temperature == self.previous:
return
self.previous = temperature
print("receiving:", temperature)
self.label.setNum(temperature)
if __name__ == "__main__":
app = QtWidgets.QApplication([])
window = MainWindow()
window.show()
app.exec()
注意:pyqtSlot 仅适用于 QObjects,而 QRunnable 则不行。