python paho-mqtt 的单元测试不工作 - 简单的语法问题
python unittest for paho-mqtt not working - simple syntax issue
我正在尝试使用 python 的 unittest 包编写一个简单的测试,它只是检测是否存在代理连接。尽管建立了成功的代理连接,它似乎还是失败了,我 90% 确定这是语法问题——特别是 has_connected 布尔变量的定义。
import paho.mqtt.client as mqtt
import time
class TestBrokerConnection(unittest.TestCase):
def setUp(self):
self.client = mqtt.Client("Test Client")
self.client.on_connect = self.on_connect
self.broker = "10.0.2.4"
self.port = 1883
self.has_connected = False
def on_connect(client, userdata, flags, rc): #connect function
if rc==0:
self.has_connected = True
def test_connection(self): #test to check connection to broker
self.client.connect(self.broker, self.port)
self.client.loop_start()
time.sleep(2)
self.client.loop_stop()
self.assertTrue(self.has_connected)
if __name__ == '__main__':
unittest.main()
任何帮助将不胜感激:)
我复制了您的代码示例并使用了 paho.mqtt 客户端提供的连接示例
client.connect("mqtt.eclipse.org", 1883, 60)
我认为你的问题可能出在你的 on_connect 函数上,你正在引用 self.has_connected 但你没有将对 self 的引用传递到函数中。
这对我有效,如果将 self 添加到其中 on_connect 可以解决您遇到的问题,请告诉我!
class TestBrokerConnection(unittest.TestCase):
def setUp(self):
self.client = mqtt.Client("Test Client")
self.client.on_connect = self.on_connect
self.broker = "mqtt.eclipse.org"
self.port = 1883
self.has_connected = False
def on_connect(self, client, userdata, flags, rc): # connect function
if rc == 0:
self.has_connected = True
def test_connection(self): # test to check connection to broker
self.client.connect(self.broker, self.port)
self.client.loop_start()
time.sleep(2)
self.client.loop_stop()
self.assertTrue(self.has_connected)
我正在尝试使用 python 的 unittest 包编写一个简单的测试,它只是检测是否存在代理连接。尽管建立了成功的代理连接,它似乎还是失败了,我 90% 确定这是语法问题——特别是 has_connected 布尔变量的定义。
import paho.mqtt.client as mqtt
import time
class TestBrokerConnection(unittest.TestCase):
def setUp(self):
self.client = mqtt.Client("Test Client")
self.client.on_connect = self.on_connect
self.broker = "10.0.2.4"
self.port = 1883
self.has_connected = False
def on_connect(client, userdata, flags, rc): #connect function
if rc==0:
self.has_connected = True
def test_connection(self): #test to check connection to broker
self.client.connect(self.broker, self.port)
self.client.loop_start()
time.sleep(2)
self.client.loop_stop()
self.assertTrue(self.has_connected)
if __name__ == '__main__':
unittest.main()
任何帮助将不胜感激:)
我复制了您的代码示例并使用了 paho.mqtt 客户端提供的连接示例
client.connect("mqtt.eclipse.org", 1883, 60)
我认为你的问题可能出在你的 on_connect 函数上,你正在引用 self.has_connected 但你没有将对 self 的引用传递到函数中。
这对我有效,如果将 self 添加到其中 on_connect 可以解决您遇到的问题,请告诉我!
class TestBrokerConnection(unittest.TestCase):
def setUp(self):
self.client = mqtt.Client("Test Client")
self.client.on_connect = self.on_connect
self.broker = "mqtt.eclipse.org"
self.port = 1883
self.has_connected = False
def on_connect(self, client, userdata, flags, rc): # connect function
if rc == 0:
self.has_connected = True
def test_connection(self): # test to check connection to broker
self.client.connect(self.broker, self.port)
self.client.loop_start()
time.sleep(2)
self.client.loop_stop()
self.assertTrue(self.has_connected)