为什么尝试从两个 ZeroMQ 套接字读取消息的代码失败了?
Why a code, trying to read messages from two ZeroMQ sockets, fails?
我在从两个 zmq 服务器(一个设置为 REQ|REP
和一个 PUB|SUB
)读取消息时遇到问题
两台服务器运行在另一台电脑上。当我只读取 REQ|REP
连接时,一切正常,但一旦我也尝试读取 PUB|SUB
连接,程序就会冻结(我猜它会永远等待消息)
from PyQt5 import QtCore, QtGui, QtWidgets
import zmq
import ui_mainwindow
class MainWindow(QtWidgets.QMainWindow, ui_mainwindow.Ui_MainWindow):
def __init__(self, parent = None):
super(MainWindow, self).__init__(parent)
self.context = zmq.Context()
try:
self.stateSocket = self.context.socket(zmq.REQ)
self.stateSocket.connect("tcp://134.105.89.197:5555")
except zmq.ZMQError as e:
print('States setup failed: ', e)
try:
self.context = zmq.Context()
self.anglesSocket = self.context.socket(zmq.SUB)
self.anglesSocket.connect("tcp://134.105.89.197:5556")
except zmq.ZMQError as e:
print('angles setup failed: ', e)
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.publishState)
self.timer.setInterval(500)
self.timer.start()
self.timer2 = QtCore.QTimer()
self.timer2.timeout.connect(self.publishAngles)
self.timer2.setInterval(500)
self.timer2.start()
# +more variables unrelated to problem
def publishState(self):
request= "a string"
try:
self.stateSocket.send_string(request)
self.reset = 0
message = self.stateSocket.recv()#flags=zmq.NOBLOCK)
values = [float(i) for i in message.decode("UTF-8").split(',')]
print("Status: ", message)
except zmq.ZMQError as e:
print('State communication: ', e)
values = [0] * 100
def publishAngles(self):
try:
message = anglesSocket.recv_string() # flags=zmq.NOBLOCK)
#values = [float(i) for i in message.decode("UTF-8").split(',')]
print("Angles: ", message)
except zmq.ZMQError as e:
print('Angles communication: ', e)
values = [0] * 100
编辑:添加了完整的相关代码。
我观察到的是死锁不是来自 REQ|REP
,这部分单独工作非常好。但是似乎 PUB|SUB
部分在定时器功能中不起作用。当我在 publishAngels()
中使用 while 循环制作一个最小示例时,它可以工作。
那么有没有一种优雅的方法可以在 Qt Timer 连接函数中使用 PUB|SUB
套接字?
如果你从未使用过 ZeroMQ,
你可能会喜欢先看看
,然后再深入了解更多细节
Q: "Is there any stupid mistake I am overlooking?"
是的,有一些很容易提炼。
1)
到目前为止,不完整的可见 ZeroMQ 部分表现出主要的不确定性,即什么类型的订阅和其他安全保护设置(如果有的话)何时何地应用于 SUB
-socket-Archetype 接入点。这同样适用于 REQ
-socket-Archetype AccessPoint,除了出于明显原因的订阅管理相关设置种类。
2)
代码忽略了 d分布式-Finit- 已知规则的文档化原则State-A自动机的 (dFSA) 逻辑,硬连接到 REQ/REP
可扩展的正式通信原型中。使用正确的逻辑避免这种情况,不违反此处强制性的 REQ-REP-REQ-REP-REQ-REP 的 dFSA 步进器,并使 REQ
和 SUB
处理中的任何一个变得相互独立你有它。换句话说,忽略使用 zmq.NOBLOCK
标志的幼稚的 dFSA 规则也不能解决死锁。
如果您想认真成为分布式计算专家,必读 Pieter Hintjen 的精彩著作 "Code Connected, Volume 1"
我在从两个 zmq 服务器(一个设置为 REQ|REP
和一个 PUB|SUB
)读取消息时遇到问题
两台服务器运行在另一台电脑上。当我只读取 REQ|REP
连接时,一切正常,但一旦我也尝试读取 PUB|SUB
连接,程序就会冻结(我猜它会永远等待消息)
from PyQt5 import QtCore, QtGui, QtWidgets
import zmq
import ui_mainwindow
class MainWindow(QtWidgets.QMainWindow, ui_mainwindow.Ui_MainWindow):
def __init__(self, parent = None):
super(MainWindow, self).__init__(parent)
self.context = zmq.Context()
try:
self.stateSocket = self.context.socket(zmq.REQ)
self.stateSocket.connect("tcp://134.105.89.197:5555")
except zmq.ZMQError as e:
print('States setup failed: ', e)
try:
self.context = zmq.Context()
self.anglesSocket = self.context.socket(zmq.SUB)
self.anglesSocket.connect("tcp://134.105.89.197:5556")
except zmq.ZMQError as e:
print('angles setup failed: ', e)
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.publishState)
self.timer.setInterval(500)
self.timer.start()
self.timer2 = QtCore.QTimer()
self.timer2.timeout.connect(self.publishAngles)
self.timer2.setInterval(500)
self.timer2.start()
# +more variables unrelated to problem
def publishState(self):
request= "a string"
try:
self.stateSocket.send_string(request)
self.reset = 0
message = self.stateSocket.recv()#flags=zmq.NOBLOCK)
values = [float(i) for i in message.decode("UTF-8").split(',')]
print("Status: ", message)
except zmq.ZMQError as e:
print('State communication: ', e)
values = [0] * 100
def publishAngles(self):
try:
message = anglesSocket.recv_string() # flags=zmq.NOBLOCK)
#values = [float(i) for i in message.decode("UTF-8").split(',')]
print("Angles: ", message)
except zmq.ZMQError as e:
print('Angles communication: ', e)
values = [0] * 100
编辑:添加了完整的相关代码。
我观察到的是死锁不是来自 REQ|REP
,这部分单独工作非常好。但是似乎 PUB|SUB
部分在定时器功能中不起作用。当我在 publishAngels()
中使用 while 循环制作一个最小示例时,它可以工作。
那么有没有一种优雅的方法可以在 Qt Timer 连接函数中使用 PUB|SUB
套接字?
如果你从未使用过 ZeroMQ,
你可能会喜欢先看看
,然后再深入了解更多细节
Q: "Is there any stupid mistake I am overlooking?"
是的,有一些很容易提炼。
1)
到目前为止,不完整的可见 ZeroMQ 部分表现出主要的不确定性,即什么类型的订阅和其他安全保护设置(如果有的话)何时何地应用于 SUB
-socket-Archetype 接入点。这同样适用于 REQ
-socket-Archetype AccessPoint,除了出于明显原因的订阅管理相关设置种类。
2)
代码忽略了 d分布式-Finit- 已知规则的文档化原则State-A自动机的 (dFSA) 逻辑,硬连接到 REQ/REP
可扩展的正式通信原型中。使用正确的逻辑避免这种情况,不违反此处强制性的 REQ-REP-REQ-REP-REQ-REP 的 dFSA 步进器,并使 REQ
和 SUB
处理中的任何一个变得相互独立你有它。换句话说,忽略使用 zmq.NOBLOCK
标志的幼稚的 dFSA 规则也不能解决死锁。
如果您想认真成为分布式计算专家,必读 Pieter Hintjen 的精彩著作 "Code Connected, Volume 1"