带有qt事件循环的pyzmq
pyzmq with qt event loop
我在我的 qt 应用程序中使用 pyzmq。
我在 first link 的邮件列表中找到了一些过去的解决方案。所以,这是我的代码 link.
import zmq
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtWidgets import QApplication, QWidget
class ChatApp(QWidget):
def __init__(self):
super(ChatApp, self).__init__()
self._zmq_context = zmq.Context()
self._zmq_sock = self._zmq_context.socket(zmq.SUB)
self._zmq_sock.connect("tcp://localhost:5556")
self._zmq_sock.setsockopt(zmq.SUBSCRIBE, b"bm_chat")
self.read_noti = QSocketNotifier(self._zmq_sock.getsockopt(zmq.FD),
QSocketNotifier.Read,
self)
self.read_noti.activated.connect(self.on_read_msg)
def on_read_msg(self, _):
self.read_noti.setEnabled(False)
flags = self._zmq_sock.getsockopt(zmq.EVENTS)
if flags & zmq.POLLIN:
msg = self._zmq_sock.recv_multipart()
topic = msg[0]
data = msg[1]
print(topic, data)
elif flags & zmq.POLLOUT:
print("[Socket] zmq.POLLOUT")
elif flags & zmq.POLLERR:
print("[Socket] zmq.POLLERR")
else:
print("[Socket] FAILURE")
self.read_noti.setEnabled(True)
if __name__ == '__main__':
app = QApplication([])
win = ChatApp()
win.show()
app.exec_()
但是,正如预期的那样,消息触发一次后,就再也没有发生过。这是我的留言
[Socket] FAILURE
b'bm_trade' b'hello0'
所以我搜索了其他解决方案here,启用qt通知后读取self._zmq_sock.getsockopt(zmq.EVENTS)。所以我在最后一行更改了我的代码
def on_read_msg(self, _):
self.read_noti.setEnabled(False)
flags = self._zmq_sock.getsockopt(zmq.EVENTS)
if flags & zmq.POLLIN:
msg = self._zmq_sock.recv_multipart()
topic = msg[0]
data = msg[1]
print(topic, data)
elif flags & zmq.POLLOUT:
print("[Socket] zmq.POLLOUT")
elif flags & zmq.POLLERR:
print("[Socket] zmq.POLLERR")
else:
print("[Socket] FAILURE")
self.read_noti.setEnabled(True)
self._zmq_sock.getsockopt(zmq.EVENTS) // Here is fixed
它工作得很好,直到数据速率很低。这是我的 PUB 服务器代码。
from time import sleep
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5557")
def ms(millisec):
return millisec / 1000
if __name__ == '__main__':
count = 0
while True:
socket.send_multipart(
[b'bm_trade', bytes(('hello' + str(count)).encode('utf-8'))])
count += 1
sleep(ms(10))
该技巧仅适用于超过 10 毫秒的事件时间间隔。如果将 sleep(ms()) 更改为 10 以下,客户端也会被触发一次,并且永远不会触发。
有人会说没关系,但在我的应用程序中,延迟非常低,所有消息都应该始终在 gui 中可见。
为什么会出现这个问题,如何解决?
你只需要在有数据的情况下继续阅读,如下所示:
def on_read_msg(self):
self.read_noti.setEnabled(False)
if self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLIN:
while self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLIN:
topic, data = self._zmq_sock.recv_multipart()
print(topic, data)
elif self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("[Socket] zmq.POLLOUT")
elif self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLERR:
print("[Socket] zmq.POLLERR")
self.read_noti.setEnabled(True)
我在我的 qt 应用程序中使用 pyzmq。
我在 first link 的邮件列表中找到了一些过去的解决方案。所以,这是我的代码 link.
import zmq
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtWidgets import QApplication, QWidget
class ChatApp(QWidget):
def __init__(self):
super(ChatApp, self).__init__()
self._zmq_context = zmq.Context()
self._zmq_sock = self._zmq_context.socket(zmq.SUB)
self._zmq_sock.connect("tcp://localhost:5556")
self._zmq_sock.setsockopt(zmq.SUBSCRIBE, b"bm_chat")
self.read_noti = QSocketNotifier(self._zmq_sock.getsockopt(zmq.FD),
QSocketNotifier.Read,
self)
self.read_noti.activated.connect(self.on_read_msg)
def on_read_msg(self, _):
self.read_noti.setEnabled(False)
flags = self._zmq_sock.getsockopt(zmq.EVENTS)
if flags & zmq.POLLIN:
msg = self._zmq_sock.recv_multipart()
topic = msg[0]
data = msg[1]
print(topic, data)
elif flags & zmq.POLLOUT:
print("[Socket] zmq.POLLOUT")
elif flags & zmq.POLLERR:
print("[Socket] zmq.POLLERR")
else:
print("[Socket] FAILURE")
self.read_noti.setEnabled(True)
if __name__ == '__main__':
app = QApplication([])
win = ChatApp()
win.show()
app.exec_()
但是,正如预期的那样,消息触发一次后,就再也没有发生过。这是我的留言
[Socket] FAILURE
b'bm_trade' b'hello0'
所以我搜索了其他解决方案here,启用qt通知后读取self._zmq_sock.getsockopt(zmq.EVENTS)。所以我在最后一行更改了我的代码
def on_read_msg(self, _):
self.read_noti.setEnabled(False)
flags = self._zmq_sock.getsockopt(zmq.EVENTS)
if flags & zmq.POLLIN:
msg = self._zmq_sock.recv_multipart()
topic = msg[0]
data = msg[1]
print(topic, data)
elif flags & zmq.POLLOUT:
print("[Socket] zmq.POLLOUT")
elif flags & zmq.POLLERR:
print("[Socket] zmq.POLLERR")
else:
print("[Socket] FAILURE")
self.read_noti.setEnabled(True)
self._zmq_sock.getsockopt(zmq.EVENTS) // Here is fixed
它工作得很好,直到数据速率很低。这是我的 PUB 服务器代码。
from time import sleep
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5557")
def ms(millisec):
return millisec / 1000
if __name__ == '__main__':
count = 0
while True:
socket.send_multipart(
[b'bm_trade', bytes(('hello' + str(count)).encode('utf-8'))])
count += 1
sleep(ms(10))
该技巧仅适用于超过 10 毫秒的事件时间间隔。如果将 sleep(ms()) 更改为 10 以下,客户端也会被触发一次,并且永远不会触发。
有人会说没关系,但在我的应用程序中,延迟非常低,所有消息都应该始终在 gui 中可见。
为什么会出现这个问题,如何解决?
你只需要在有数据的情况下继续阅读,如下所示:
def on_read_msg(self):
self.read_noti.setEnabled(False)
if self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLIN:
while self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLIN:
topic, data = self._zmq_sock.recv_multipart()
print(topic, data)
elif self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("[Socket] zmq.POLLOUT")
elif self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLERR:
print("[Socket] zmq.POLLERR")
self.read_noti.setEnabled(True)