为什么尝试从两个 ZeroMQ 套接字读取消息的代码失败了?

Why a code, trying to read messages from two ZeroMQ sockets, fails?

我在从两个 zmq 服务器(一个设置为 REQ|REP 和一个 PUB|SUB)读取消息时遇到问题

两台服务器运行在另一台电脑上。当我只读取 REQ|REP 连接时,一切正常,但一旦我也尝试读取 PUB|SUB 连接,程序就会冻结(我猜它会永远等待消息)

from PyQt5 import QtCore, QtGui, QtWidgets
import zmq
import ui_mainwindow

class MainWindow(QtWidgets.QMainWindow, ui_mainwindow.Ui_MainWindow):
    def __init__(self, parent = None):
        super(MainWindow, self).__init__(parent)
        self.context = zmq.Context()
        try:
            self.stateSocket = self.context.socket(zmq.REQ)
            self.stateSocket.connect("tcp://134.105.89.197:5555")
        except zmq.ZMQError as e:
            print('States setup failed: ', e)

        try:
            self.context = zmq.Context()
            self.anglesSocket = self.context.socket(zmq.SUB)
            self.anglesSocket.connect("tcp://134.105.89.197:5556")
        except zmq.ZMQError as e:
            print('angles setup failed: ', e)

        self.timer = QtCore.QTimer()
        self.timer.timeout.connect(self.publishState) 
        self.timer.setInterval(500)  
        self.timer.start()

        self.timer2 = QtCore.QTimer()
        self.timer2.timeout.connect(self.publishAngles) 
        self.timer2.setInterval(500)
        self.timer2.start()

        # +more variables unrelated to problem




    def publishState(self):
        request= "a string"
        try:
            self.stateSocket.send_string(request)
            self.reset = 0
            message = self.stateSocket.recv()#flags=zmq.NOBLOCK)
            values = [float(i) for i in message.decode("UTF-8").split(',')]
            print("Status: ", message)

        except zmq.ZMQError as e:
            print('State communication: ', e)
            values = [0] * 100

    def publishAngles(self):
        try:
            message = anglesSocket.recv_string()  # flags=zmq.NOBLOCK)
            #values = [float(i) for i in message.decode("UTF-8").split(',')]
            print("Angles: ", message)

            except zmq.ZMQError as e:
                print('Angles communication: ', e)
                values = [0] * 100

编辑:添加了完整的相关代码。 我观察到的是死锁不是来自 REQ|REP,这部分单独工作非常好。但是似乎 PUB|SUB 部分在定时器功能中不起作用。当我在 publishAngels() 中使用 while 循环制作一个最小示例时,它可以工作。

那么有没有一种优雅的方法可以在 Qt Timer 连接函数中使用 PUB|SUB 套接字?


如果你从未使用过 ZeroMQ,
你可能会喜欢先看看
,然后再深入了解更多细节



Q: "Is there any stupid mistake I am overlooking?"

是的,有一些很容易提炼。

1)
到目前为止,不完整的可见 ZeroMQ 部分表现出主要的不确定性,即什么类型的订阅和其他安全保护设置(如果有的话)何时何地应用于 SUB-socket-Archetype 接入点。这同样适用于 REQ-socket-Archetype AccessPoint,除了出于明显原因的订阅管理相关设置种类。

2)
代码忽略了 d分布式-Finit- 已知规则的文档化原则State-A自动机的 (dFSA) 逻辑,硬连接到 REQ/REP 可扩展的正式通信原型中。使用正确的逻辑避免这种情况,不违反此处强制性的 REQ-REP-REQ-REP-REQ-REP 的 dFSA 步进器,并使 REQSUB 处理中的任何一个变得相互独立你有它。换句话说,忽略使用 zmq.NOBLOCK 标志的幼稚的 dFSA 规则也不能解决死锁。


如果您想认真成为分布式计算专家,必读 Pieter Hintjen 的精彩著作 "Code Connected, Volume 1"