MQTT Broker 不按时传递发布者发送的消息
MQTT Broker does not deliver the messages sent by publisher, on time
我写了这个 MQTT 发布者代码:
import paho.mqtt.client as mqtt
import time
HOST = "localhost"
PORT = 1883
KEEP_ALIVE_INT = 100
TOPIC = "noti"
def sendMsg():
MSG = ["1111", "2222", "3333", "4444", "5555"]
i = 0
try:
while i<5:
client.publish(TOPIC, MSG[i], qos=0)
i+=1
time.sleep(1)
except Exception as e:
print("Caught Exception: " + e)
def onConnect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully")
sendMsg()
else:
print("Connection failed, result code: " + str(rc))
def onPublish(client, userdata, mid):
print ("Message is published")
client = mqtt.Client("pub")
client.on_connect = onConnect
client.on_publish = onPublish
client.connect(HOST, PORT, KEEP_ALIVE_INT)
client.loop_forever()
并且,以下是 MQTT 订阅者代码:
import paho.mqtt.client as mqtt
import time
HOST = "localhost"
PORT = 1883
KEEP_ALIVE_INT = 100
TOPIC = "noti"
def onConnect(client, userdata, flags, rc):
if rc == 0:
print("=> Connected successfully")
client.subscribe(TOPIC, 0)
else:
print("=> Connection failed, result code: " + str(rc))
def onSubscribe(mosq, obj, mid, granted_qos):
print ("=> Subscribed to topic: " + TOPIC)
print ("Granted QOS: "+str(granted_qos))
def onMessage(client, userdata, msg):
print("=> Received message: " + msg.topic +" - " + msg.payload.decode("utf-8"))
client = mqtt.Client("sub")
client.on_message = onMessage
client.on_connect = onConnect
client.on_subscribe = onSubscribe
client.connect(HOST, PORT, KEEP_ALIVE_INT )
client.loop_forever()
我在我的电脑上使用 Mosquitto 代理。
发布每 1 秒完成一次,但在发布所有 5 条消息后,我可以看到打印“消息已发布”5 次。另外,订阅者在 5 秒后一起接收消息,而不是每 1 秒接收一次消息。
请大家帮我理解错误,建议修改。
这是因为所有回调和消息处理都发生在客户端网络循环线程上,而您通过不从 on_connect()
回调中返回来阻塞该线程。
因此对 client.publish()
的调用会排队等待 on_connect()
回调 returns。
您需要找到一种不在客户端循环上触发 sendMsg()
函数的方法。 (可能在单独的线程上)
我写了这个 MQTT 发布者代码:
import paho.mqtt.client as mqtt
import time
HOST = "localhost"
PORT = 1883
KEEP_ALIVE_INT = 100
TOPIC = "noti"
def sendMsg():
MSG = ["1111", "2222", "3333", "4444", "5555"]
i = 0
try:
while i<5:
client.publish(TOPIC, MSG[i], qos=0)
i+=1
time.sleep(1)
except Exception as e:
print("Caught Exception: " + e)
def onConnect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully")
sendMsg()
else:
print("Connection failed, result code: " + str(rc))
def onPublish(client, userdata, mid):
print ("Message is published")
client = mqtt.Client("pub")
client.on_connect = onConnect
client.on_publish = onPublish
client.connect(HOST, PORT, KEEP_ALIVE_INT)
client.loop_forever()
并且,以下是 MQTT 订阅者代码:
import paho.mqtt.client as mqtt
import time
HOST = "localhost"
PORT = 1883
KEEP_ALIVE_INT = 100
TOPIC = "noti"
def onConnect(client, userdata, flags, rc):
if rc == 0:
print("=> Connected successfully")
client.subscribe(TOPIC, 0)
else:
print("=> Connection failed, result code: " + str(rc))
def onSubscribe(mosq, obj, mid, granted_qos):
print ("=> Subscribed to topic: " + TOPIC)
print ("Granted QOS: "+str(granted_qos))
def onMessage(client, userdata, msg):
print("=> Received message: " + msg.topic +" - " + msg.payload.decode("utf-8"))
client = mqtt.Client("sub")
client.on_message = onMessage
client.on_connect = onConnect
client.on_subscribe = onSubscribe
client.connect(HOST, PORT, KEEP_ALIVE_INT )
client.loop_forever()
我在我的电脑上使用 Mosquitto 代理。
发布每 1 秒完成一次,但在发布所有 5 条消息后,我可以看到打印“消息已发布”5 次。另外,订阅者在 5 秒后一起接收消息,而不是每 1 秒接收一次消息。
请大家帮我理解错误,建议修改。
这是因为所有回调和消息处理都发生在客户端网络循环线程上,而您通过不从 on_connect()
回调中返回来阻塞该线程。
因此对 client.publish()
的调用会排队等待 on_connect()
回调 returns。
您需要找到一种不在客户端循环上触发 sendMsg()
函数的方法。 (可能在单独的线程上)