Paho MQTT 动态订阅新主题
Paho MQTT dynamically subscribe to new topic
我还在学习如何使用MQTT发布和订阅。
如果用户添加新主题,是否可以动态订阅主题?
比如现在的代码是这样的:
import anotherModule
topic = anotherModule.topic #the output is similar to this -> [('Test1', 0), ('Test2', 0)]
...
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
new_msg = json.loads(msg.payload.decode())
print(new_msg)
def check_topic():
while True:
client.subscribe(anotherModule.topic) #subscribe to the new topic if there are any
time.sleep(1)
t3 = threading.Thread(target=check_topic)
t3.start()
# client.subscribe(topic)
client.on_message = on_message
client = connect_mqtt()
subscribe(client)
t1 = threading.Thread(target=periodic_check_connection) #this function is irrelvant to the question
t1.start()
t2 = threading.Thread(target=client.loop_forever)
t2.start()
现在,我正在使用线程 运行 在 while 循环中订阅另一个线程。这样,如果用户添加,它将始终订阅任何新主题。
有没有更好的方法来做到这一点?如果用户说订阅 'Test3',('Test3', 0) 将附加在 anotherModule.topic 中。但是,mqtt无法动态订阅这个新主题,因此引入了while循环。
感谢任何帮助。谢谢!
编辑 1:
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
client.subscribe(anotherModule.topic) # I tried putting here but wont work
return client
def update_topic(new_topic):
client.subscribe(new_topic) # How to run client.subscribe here?
def run():
global client
client = connect_mqtt()
subscribe(client)
t1 = threading.Thread(target=periodic_check_connection)
t1.start()
t2 = threading.Thread(target=client.loop_forever)
t2.start()
我试过在客户端添加 global 但它似乎不起作用。它说 name client is not defined for client.subscribe(new_topic)
是的,
您可以在建立连接后随时调用 client.subscribe()
,这将告诉代理将匹配模式的消息发送给该客户端。
您也可以调用 client.unsubscribe()
停止接收给定主题模式的消息。
在循环中调用 client.subscribe()
没有任何用处,每个主题只需要调用一次,订阅详细信息由代理而不是客户端维护。
“othermodule”应该只调用此模块上的一个函数,该函数传递它感兴趣的主题(以及可选的 QOS),然后您调用 client.subscribe(topic)
但值得指出的是,只有 1 个 client.on_message
回调,如果您尝试多次设置此回调(例如,每次调用订阅),您只会覆盖最后一个。
我认为最优雅的方法是使用订阅通配符并正确设计已发布的主题路径。因此,您可能有一些 top-level 主题 Test
,例如和未知数量的 sub-topics 像这样:
Tests/Test1
Tests/Test2
Tests/Test3
...
订阅客户端只需订阅Tests/+
,自动获取所有sub-topics,无需重新订阅
您在特殊主题的有效负载内提供新主题的方法也应该有效,但不需要无限循环订阅。为此,您可能有一个列表来管理您当前订阅的主题,如果您的主题名称不在此列表中,则只需调用 client.subscribe
。
我已经设法通过将另一个模块设为 class 来解决这个问题。我已经实例化了 anotherModule 并从当前模块传入客户端,这样我就可以从 anotherModule 调用 client.subscribe 而不是调用函数。感谢您的帮助!
# current module
def run():
client = connect_mqtt()
anotherModule.ModuleClass(client) #client is pass to another module
subscribe(client)
t1 = threading.Thread(target=periodic_check_connection)
t1.start()
t2 = threading.Thread(target=client.loop_forever)
t2.start()
# anotherModule
class ModuleClass:
def __init__(self, client):
self.client = client
....
def updateTopic(new_topic):
self.client.subscribe(new_topic)
我还在学习如何使用MQTT发布和订阅。
如果用户添加新主题,是否可以动态订阅主题?
比如现在的代码是这样的:
import anotherModule
topic = anotherModule.topic #the output is similar to this -> [('Test1', 0), ('Test2', 0)]
...
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
new_msg = json.loads(msg.payload.decode())
print(new_msg)
def check_topic():
while True:
client.subscribe(anotherModule.topic) #subscribe to the new topic if there are any
time.sleep(1)
t3 = threading.Thread(target=check_topic)
t3.start()
# client.subscribe(topic)
client.on_message = on_message
client = connect_mqtt()
subscribe(client)
t1 = threading.Thread(target=periodic_check_connection) #this function is irrelvant to the question
t1.start()
t2 = threading.Thread(target=client.loop_forever)
t2.start()
现在,我正在使用线程 运行 在 while 循环中订阅另一个线程。这样,如果用户添加,它将始终订阅任何新主题。
有没有更好的方法来做到这一点?如果用户说订阅 'Test3',('Test3', 0) 将附加在 anotherModule.topic 中。但是,mqtt无法动态订阅这个新主题,因此引入了while循环。
感谢任何帮助。谢谢!
编辑 1:
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
client.subscribe(anotherModule.topic) # I tried putting here but wont work
return client
def update_topic(new_topic):
client.subscribe(new_topic) # How to run client.subscribe here?
def run():
global client
client = connect_mqtt()
subscribe(client)
t1 = threading.Thread(target=periodic_check_connection)
t1.start()
t2 = threading.Thread(target=client.loop_forever)
t2.start()
我试过在客户端添加 global 但它似乎不起作用。它说 name client is not defined for client.subscribe(new_topic)
是的,
您可以在建立连接后随时调用 client.subscribe()
,这将告诉代理将匹配模式的消息发送给该客户端。
您也可以调用 client.unsubscribe()
停止接收给定主题模式的消息。
在循环中调用 client.subscribe()
没有任何用处,每个主题只需要调用一次,订阅详细信息由代理而不是客户端维护。
“othermodule”应该只调用此模块上的一个函数,该函数传递它感兴趣的主题(以及可选的 QOS),然后您调用 client.subscribe(topic)
但值得指出的是,只有 1 个 client.on_message
回调,如果您尝试多次设置此回调(例如,每次调用订阅),您只会覆盖最后一个。
我认为最优雅的方法是使用订阅通配符并正确设计已发布的主题路径。因此,您可能有一些 top-level 主题 Test
,例如和未知数量的 sub-topics 像这样:
Tests/Test1
Tests/Test2
Tests/Test3
...
订阅客户端只需订阅Tests/+
,自动获取所有sub-topics,无需重新订阅
您在特殊主题的有效负载内提供新主题的方法也应该有效,但不需要无限循环订阅。为此,您可能有一个列表来管理您当前订阅的主题,如果您的主题名称不在此列表中,则只需调用 client.subscribe
。
我已经设法通过将另一个模块设为 class 来解决这个问题。我已经实例化了 anotherModule 并从当前模块传入客户端,这样我就可以从 anotherModule 调用 client.subscribe 而不是调用函数。感谢您的帮助!
# current module
def run():
client = connect_mqtt()
anotherModule.ModuleClass(client) #client is pass to another module
subscribe(client)
t1 = threading.Thread(target=periodic_check_connection)
t1.start()
t2 = threading.Thread(target=client.loop_forever)
t2.start()
# anotherModule
class ModuleClass:
def __init__(self, client):
self.client = client
....
def updateTopic(new_topic):
self.client.subscribe(new_topic)