python paho-mqtt 的单元测试不工作 - 简单的语法问题

python unittest for paho-mqtt not working - simple syntax issue

我正在尝试使用 python 的 unittest 包编写一个简单的测试,它只是检测是否存在代理连接。尽管建立了成功的代理连接,它似乎还是失败了,我 90% 确定这是语法问题——特别是 has_connected 布尔变量的定义。

import paho.mqtt.client as mqtt
import time     

class TestBrokerConnection(unittest.TestCase):  
    def setUp(self):
        self.client = mqtt.Client("Test Client")
        self.client.on_connect = self.on_connect
        self.broker = "10.0.2.4"
        self.port = 1883
        self.has_connected = False
   
    def on_connect(client, userdata, flags, rc): #connect function
        if rc==0:
            self.has_connected = True
   
    def test_connection(self): #test to check connection to broker
        self.client.connect(self.broker, self.port)
        self.client.loop_start()
        time.sleep(2)
        self.client.loop_stop()
        self.assertTrue(self.has_connected)

if __name__ == '__main__':
    unittest.main()

任何帮助将不胜感激:)

我复制了您的代码示例并使用了 paho.mqtt 客户端提供的连接示例

client.connect("mqtt.eclipse.org", 1883, 60)

我认为你的问题可能出在你的 on_connect 函数上,你正在引用 self.has_connected 但你没有将对 self 的引用传递到函数中。

这对我有效,如果将 self 添加到其中 on_connect 可以解决您遇到的问题,请告诉我!

class TestBrokerConnection(unittest.TestCase):
def setUp(self):
    self.client = mqtt.Client("Test Client")
    self.client.on_connect = self.on_connect
    self.broker = "mqtt.eclipse.org"
    self.port = 1883
    self.has_connected = False

def on_connect(self, client, userdata, flags, rc):  # connect function
    if rc == 0:
        self.has_connected = True

def test_connection(self):  # test to check connection to broker
    self.client.connect(self.broker, self.port)
    self.client.loop_start()
    time.sleep(2)
    self.client.loop_stop()
    self.assertTrue(self.has_connected)