mqtt 客户端根据传入消息条件发布
mqtt client publish based on incoming message conditions
我正在试验带有 python paho mqtt 库的 mqtt 和带有 test.mosquito.org server/broker.
的 mqtt 客户端移动应用程序
这个基本脚本在连接到 test.mosquitto 服务器后工作,我可以在其中从移动 mqtt 客户端应用程序向该脚本发布消息,并且该脚本还可以每 20 秒向移动应用程序发布一条测试消息通过 def publish(client):
函数。
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'test.mosquitto.org'
port = 1883
# generate client ID with pub prefix randomly
client_id = "test_1"
topic_to_publish = f"laptop/publish"
topic_to_listen = f"mobile/publish"
topic_to_wildcard = f"testing/*"
username = ""
password = ""
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.subscribe(topic_to_listen)
print(f"Connected to MQTT Broker on topic: {topic_to_wildcard}")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
client.on_connect = on_connect # Define callback function for successful connection
client.on_message = on_message # Define callback function for receipt of a message
return client
def publish(client):
msg_count = 0
while True:
time.sleep(20)
msg = f"hello from {client_id}: {msg_count}"
result = client.publish(topic_to_publish, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send {msg} to topic {topic_to_publish}")
else:
print(f"Failed to send message to topic {topic_to_publish}")
msg_count += 1
def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.
print("Message received-> " + msg.topic + " " + str(msg.payload))
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
有人可以告诉我如何修改 def publish(client):
函数,使其不再是每 20 秒触发一次消息但仅在来自移动应用程序的消息时才发布的 while loop
收到等于字符串 "zone temps"
?
我是否完全正确地从主 run
函数中删除了 publish(client)
以及从 def publish(client):
中删除了 while 循环?感谢任何提示,非常感谢。我 运行 的意思是,当我 运行 这个修改后的版本之间根本没有消息交换时,我遗漏了一些东西。
def on_message(client, userdata, msg):
print("Message received-> " + msg.topic + " " + str(msg.payload))
if str(msg.payload) == "zone temps":
publish(client,"avg=72.1;min=66.4;max=78.8")
def run():
client = connect_mqtt()
client.loop_start()
if __name__ == '__main__':
run()
我也是初学者;但我会创建一个变量来发布或收听,例如:
phoneAppListener = 0
还有
if str(msg.payload) == "zone temps":
当我打印我的负载时,它看起来像:
b'payload'
首先你需要像这样拆分你的负载:
tempMsgHolder = str(msg.payload).split("'")
当你这样做的时候。 tempMsgHolder[1] 是您的纯负载。
if tempMsgHolder[1] == "zone temps": phoneAppListener = 1
phoneAppListener 值决定 0 是监听,1 是发布。在你的发布循环中你设置了这个
phoneAppListener == 1: publish your message
import random
import time
import threading
from paho.mqtt import client as mqtt_client
class moduleDatas:
broker = ('test.mosquitto.org')
port = (1883)
# generate client ID with pub prefix randomly
client_id = "test_1"
topic_to_publish = f"laptop/publish"
topic_to_listen = f"mobile/publish"
topic_to_wildcard = f"testing/*"
username = ""
password = ""
# Create clients object:
# You can create mqtt client obj using same pattern. Client has different on_msg or ex.
mqttClient_1 = mqtt_client.Client(moduleDatas.client_id) # You can create what ever you want to create a new thread
def mqttClientConnect():
mqttClient_1.connect(moduleDatas.broker[0], moduleDatas.port[0])
mqttClient_1.loop_start() # It creates daemon thread while your main thread running, this will handle your mqtt connection.
@mqttClient_1.connect_callback()
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(f"Connected to MQTT Broker on topic: {moduleDatas.topic_to_wildcard}")
else:
print("Failed to connect, return code %d\n", rc)
@mqttClient_1.publish_callback()
def on_publish(client, userdata, mid):
print(mid) # If publish is success its return 1 || If mid = 1 publish success. || You can check your publish msg if it return failed try to send again or check your connection.
@mqttClient_1.message_callback()
def on_message(client, userdata, message):
temp_str = str(message.payload).split("'")
if temp_str[1] == "zone temps":
msg = "hello world" # <-- Your message here. Some func return or simple texts
mqttClient_1.publish(topic= moduleDatas.topic_to_publish, payload= msg, qos= 0)
def mqttClientSubscribe():
mqttClient_1.subscribe(moduleDatas.topic_to_listen)
def threadMqttClient1():
mqttClientConnect()
mqttClientSubscribe()
def buildThreads():
threads= []
t = threading.Thread(target=threadMqttClient1(), daemon= True)
threads.append(t)
# You can create on same pattern and append threads list.
for t in threads:
t.start()
while True: # this will your main thread, you can create an operation, ill go with just idling.
pass
if __name__ == "__main__":
buildThreads()
我正在试验带有 python paho mqtt 库的 mqtt 和带有 test.mosquito.org server/broker.
的 mqtt 客户端移动应用程序这个基本脚本在连接到 test.mosquitto 服务器后工作,我可以在其中从移动 mqtt 客户端应用程序向该脚本发布消息,并且该脚本还可以每 20 秒向移动应用程序发布一条测试消息通过 def publish(client):
函数。
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'test.mosquitto.org'
port = 1883
# generate client ID with pub prefix randomly
client_id = "test_1"
topic_to_publish = f"laptop/publish"
topic_to_listen = f"mobile/publish"
topic_to_wildcard = f"testing/*"
username = ""
password = ""
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.subscribe(topic_to_listen)
print(f"Connected to MQTT Broker on topic: {topic_to_wildcard}")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
client.on_connect = on_connect # Define callback function for successful connection
client.on_message = on_message # Define callback function for receipt of a message
return client
def publish(client):
msg_count = 0
while True:
time.sleep(20)
msg = f"hello from {client_id}: {msg_count}"
result = client.publish(topic_to_publish, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send {msg} to topic {topic_to_publish}")
else:
print(f"Failed to send message to topic {topic_to_publish}")
msg_count += 1
def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.
print("Message received-> " + msg.topic + " " + str(msg.payload))
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
有人可以告诉我如何修改 def publish(client):
函数,使其不再是每 20 秒触发一次消息但仅在来自移动应用程序的消息时才发布的 while loop
收到等于字符串 "zone temps"
?
我是否完全正确地从主 run
函数中删除了 publish(client)
以及从 def publish(client):
中删除了 while 循环?感谢任何提示,非常感谢。我 运行 的意思是,当我 运行 这个修改后的版本之间根本没有消息交换时,我遗漏了一些东西。
def on_message(client, userdata, msg):
print("Message received-> " + msg.topic + " " + str(msg.payload))
if str(msg.payload) == "zone temps":
publish(client,"avg=72.1;min=66.4;max=78.8")
def run():
client = connect_mqtt()
client.loop_start()
if __name__ == '__main__':
run()
我也是初学者;但我会创建一个变量来发布或收听,例如:
phoneAppListener = 0
还有
if str(msg.payload) == "zone temps":
当我打印我的负载时,它看起来像:
b'payload'
首先你需要像这样拆分你的负载:
tempMsgHolder = str(msg.payload).split("'")
当你这样做的时候。 tempMsgHolder[1] 是您的纯负载。
if tempMsgHolder[1] == "zone temps": phoneAppListener = 1
phoneAppListener 值决定 0 是监听,1 是发布。在你的发布循环中你设置了这个
phoneAppListener == 1: publish your message
import random
import time
import threading
from paho.mqtt import client as mqtt_client
class moduleDatas:
broker = ('test.mosquitto.org')
port = (1883)
# generate client ID with pub prefix randomly
client_id = "test_1"
topic_to_publish = f"laptop/publish"
topic_to_listen = f"mobile/publish"
topic_to_wildcard = f"testing/*"
username = ""
password = ""
# Create clients object:
# You can create mqtt client obj using same pattern. Client has different on_msg or ex.
mqttClient_1 = mqtt_client.Client(moduleDatas.client_id) # You can create what ever you want to create a new thread
def mqttClientConnect():
mqttClient_1.connect(moduleDatas.broker[0], moduleDatas.port[0])
mqttClient_1.loop_start() # It creates daemon thread while your main thread running, this will handle your mqtt connection.
@mqttClient_1.connect_callback()
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(f"Connected to MQTT Broker on topic: {moduleDatas.topic_to_wildcard}")
else:
print("Failed to connect, return code %d\n", rc)
@mqttClient_1.publish_callback()
def on_publish(client, userdata, mid):
print(mid) # If publish is success its return 1 || If mid = 1 publish success. || You can check your publish msg if it return failed try to send again or check your connection.
@mqttClient_1.message_callback()
def on_message(client, userdata, message):
temp_str = str(message.payload).split("'")
if temp_str[1] == "zone temps":
msg = "hello world" # <-- Your message here. Some func return or simple texts
mqttClient_1.publish(topic= moduleDatas.topic_to_publish, payload= msg, qos= 0)
def mqttClientSubscribe():
mqttClient_1.subscribe(moduleDatas.topic_to_listen)
def threadMqttClient1():
mqttClientConnect()
mqttClientSubscribe()
def buildThreads():
threads= []
t = threading.Thread(target=threadMqttClient1(), daemon= True)
threads.append(t)
# You can create on same pattern and append threads list.
for t in threads:
t.start()
while True: # this will your main thread, you can create an operation, ill go with just idling.
pass
if __name__ == "__main__":
buildThreads()