如何在 运行 时间内将 ZMQ 的输出显示到 Qt GUI?
How to display an output from ZMQ to a Qt GUI in run time?
我正在尝试获取 'ZeroMQ' 客户端的输出并通过 Qt 设计器 python 脚本显示它。
代码已经正确接收数据,但我无法在 GUI window 上显示它并接收它。它只显示第一行。我正在尝试线程和 'subprocess' 但它不起作用。
我在 class(Ui_Form)
中的函数(消费者)中很好地接收数据
class Ui_Form(object):
def setupUi(self, Form):
def consumer(self):
consumer_id = random.randrange(1, 10005)
# print("I am consumer #%s" % (consumer_id))
context = zmq.Context()
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
while True:
buff = consumer_receiver.recv()
# print(time.time())
data = np.frombuffer(buff, dtype="float32")
l = np.where(data == 0)
data = data[0:l[0][0]:1]
print(data)
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for i in range(0, len(signals)):
self.textEdit.append(str(signals[i]) + "\n")
self.textEdit_2.append(str(bandwidth[i]) + "\n")
当我 运行 主:
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
Form = QtWidgets.QWidget()
ui = Ui_Form()
ui.setupUi(Form)
Form.show()
ui.consumer()
sys.exit(app.exec_())
日期收到正确但无法显示。
解释:
问题的原因是无限循环阻塞了 Qt 事件循环,阻止它处理更新 gui、监听 OS 事件等任务
解决方案:
你必须实现一些允许与 ZMQ 通信并且不阻塞事件循环的逻辑,为此有几个选项(在下面的代码中我展示了我测试的服务器的例子):
server.py
import random
import time
import zmq
import numpy as np
import logging
logging.basicConfig(level=logging.DEBUG)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
while True:
d = []
start = random.randint(0, 10)
for i in range(10):
x = start + i
d.append(x)
d.append(x ** 2)
arr = np.array(d, dtype="float32")
buf = arr.tobytes()
logging.debug(buf)
socket.send(buf)
time.sleep(1)
1。 threading.Thread
import sys
import threading
from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np
class ZMQReceiver(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(bytes)
def start(self):
threading.Thread(target=self._execute, daemon=True).start()
def _execute(self):
context = zmq.Context()
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
while True:
buff = consumer_receiver.recv()
self.dataChanged.emit(buff)
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
zmq_receiver = ZMQReceiver(self)
zmq_receiver.dataChanged.connect(self.on_data_changed)
zmq_receiver.start()
@QtCore.pyqtSlot(bytes)
def on_data_changed(self, buff):
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.show()
sys.exit(app.exec_())
2。 QSocketNotifier
import sys
from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
context = zmq.Context()
self.consumer_receiver = context.socket(zmq.PULL)
self.consumer_receiver.connect("tcp://127.0.0.1:5556")
self.read_notifier = QtCore.QSocketNotifier(
self.consumer_receiver.getsockopt(zmq.FD), QtCore.QSocketNotifier.Read, self
)
self.read_notifier.activated.connect(self.on_read_msg)
@QtCore.pyqtSlot()
def on_read_msg(self):
self.read_notifier.setEnabled(False)
if self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
while self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
buff = self.consumer_receiver.recv()
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
self.read_notifier.setEnabled(True)
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.show()
sys.exit(app.exec_())
3。异步
import sys
import asyncio
from PyQt5 import QtCore, QtWidgets
from asyncqt import QEventLoop, asyncClose
# from qasync import QEventLoop, asyncClose
import zmq
from zmq.asyncio import Context, Poller
import numpy as np
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
context = Context()
self.consumer_receiver = context.socket(zmq.PULL)
self.consumer_receiver.connect("tcp://127.0.0.1:5557")
self.poller = Poller()
self.poller.register(self.consumer_receiver, zmq.POLLIN)
async def start_consumer(self):
while True:
events = await self.poller.poll()
if self.consumer_receiver in dict(events):
buff = await self.consumer_receiver.recv()
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
@asyncClose
async def closeEvent(self, event):
self.consumer_receiver.close()
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
w = Widget()
w.show()
try:
loop.run_until_complete(w.start_consumer())
except asyncio.CancelledError:
print("start_consumer is cancelled now")
finally:
loop.close()
我正在尝试获取 'ZeroMQ' 客户端的输出并通过 Qt 设计器 python 脚本显示它。 代码已经正确接收数据,但我无法在 GUI window 上显示它并接收它。它只显示第一行。我正在尝试线程和 'subprocess' 但它不起作用。 我在 class(Ui_Form)
中的函数(消费者)中很好地接收数据class Ui_Form(object):
def setupUi(self, Form):
def consumer(self):
consumer_id = random.randrange(1, 10005)
# print("I am consumer #%s" % (consumer_id))
context = zmq.Context()
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
while True:
buff = consumer_receiver.recv()
# print(time.time())
data = np.frombuffer(buff, dtype="float32")
l = np.where(data == 0)
data = data[0:l[0][0]:1]
print(data)
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for i in range(0, len(signals)):
self.textEdit.append(str(signals[i]) + "\n")
self.textEdit_2.append(str(bandwidth[i]) + "\n")
当我 运行 主:
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
Form = QtWidgets.QWidget()
ui = Ui_Form()
ui.setupUi(Form)
Form.show()
ui.consumer()
sys.exit(app.exec_())
日期收到正确但无法显示。
解释:
问题的原因是无限循环阻塞了 Qt 事件循环,阻止它处理更新 gui、监听 OS 事件等任务
解决方案:
你必须实现一些允许与 ZMQ 通信并且不阻塞事件循环的逻辑,为此有几个选项(在下面的代码中我展示了我测试的服务器的例子):
server.py
import random
import time
import zmq
import numpy as np
import logging
logging.basicConfig(level=logging.DEBUG)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
while True:
d = []
start = random.randint(0, 10)
for i in range(10):
x = start + i
d.append(x)
d.append(x ** 2)
arr = np.array(d, dtype="float32")
buf = arr.tobytes()
logging.debug(buf)
socket.send(buf)
time.sleep(1)
1。 threading.Thread
import sys
import threading
from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np
class ZMQReceiver(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(bytes)
def start(self):
threading.Thread(target=self._execute, daemon=True).start()
def _execute(self):
context = zmq.Context()
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
while True:
buff = consumer_receiver.recv()
self.dataChanged.emit(buff)
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
zmq_receiver = ZMQReceiver(self)
zmq_receiver.dataChanged.connect(self.on_data_changed)
zmq_receiver.start()
@QtCore.pyqtSlot(bytes)
def on_data_changed(self, buff):
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.show()
sys.exit(app.exec_())
2。 QSocketNotifier
import sys
from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
context = zmq.Context()
self.consumer_receiver = context.socket(zmq.PULL)
self.consumer_receiver.connect("tcp://127.0.0.1:5556")
self.read_notifier = QtCore.QSocketNotifier(
self.consumer_receiver.getsockopt(zmq.FD), QtCore.QSocketNotifier.Read, self
)
self.read_notifier.activated.connect(self.on_read_msg)
@QtCore.pyqtSlot()
def on_read_msg(self):
self.read_notifier.setEnabled(False)
if self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
while self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
buff = self.consumer_receiver.recv()
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
self.read_notifier.setEnabled(True)
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.show()
sys.exit(app.exec_())
3。异步
import sys
import asyncio
from PyQt5 import QtCore, QtWidgets
from asyncqt import QEventLoop, asyncClose
# from qasync import QEventLoop, asyncClose
import zmq
from zmq.asyncio import Context, Poller
import numpy as np
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
context = Context()
self.consumer_receiver = context.socket(zmq.PULL)
self.consumer_receiver.connect("tcp://127.0.0.1:5557")
self.poller = Poller()
self.poller.register(self.consumer_receiver, zmq.POLLIN)
async def start_consumer(self):
while True:
events = await self.poller.poll()
if self.consumer_receiver in dict(events):
buff = await self.consumer_receiver.recv()
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
@asyncClose
async def closeEvent(self, event):
self.consumer_receiver.close()
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
w = Widget()
w.show()
try:
loop.run_until_complete(w.start_consumer())
except asyncio.CancelledError:
print("start_consumer is cancelled now")
finally:
loop.close()