在 paho-mqtt 中订阅和发布多个主题的最佳方式是什么?

What is the best way to subscribe and publish to multiple topics in paho-mqtt?


我正在使用 MQTT 来干扰我网络中的消息,并且对向代理发布和订阅多条消息的最简洁方式有疑问。
首先,我有两个列表:

request_list = [('sensors/system/temperature', 0), 
                ('sensors/system/gyroscope', 1), 
                ('sensors/system/acceleration', 2)]

其中包含我必须发布消息的主题。
我的第二个列表定义了我想要发布的消息和我想要得到回复的主题(== 我必须订阅才能得到答案的主题)。

request_value = ['{"response":"similarity/sensors/system/temperature","duration":"60s"}',
                  {"response":"similarity/sensors/system/gyroscope","duration":"60s"}', 
                 '{"response":"similarity/sensors/system/acceleration","duration":"60s"}'] 


我的经纪人对于每个主题都是相同的,并且在 PORT =“8083”上用 HOST=“192.168.137.1”定义。
现在我正在使用 for 循环,订阅一个主题,发布我的消息并等待消息进来。因为我必须等待每次订阅和发布才能成功,这非常耗时。我当前代码的伪代码如下所示:

list_measurments = []
for topic in request_list:
    client.connect("Broker","Host")
    client.loop_start() 
    client.subscribe("sub_topic")
    client.pub("pub_topic","pub_message")
    client.callback("append list_measurements")
    client.loop_stop() #stop the loop
    client.disconnect

我尝试使用线程形成我的问题 here but it turned out that the normal use of threads would be to publish the same message to a lot of different brokers. I also thought about multiple subscribtions
如果有人能给我一个提示,最干净和最快的方法是什么,我将非常感激。

您应该只在 for 循环之外连接到代理并启动客户端循环。

每次都建立和断开与代理的连接会增加大量的开销,并留下很大的空间来丢失消息。

您还应该在启动时订阅所有您想要的主题。好的,您可以根据需要添加更多或取消订阅,但如果它们始终相同,请在连接时订阅。

基本的通用方法应该如下所示。

def on_connect(client, userdata, flags, rc):
  for i in request_value:
    client.subscribe(i.response)

  for i in request_list:
    //this loop needs to be a bit more complicated
    //as you need to pull the topic from the request_list
    //and the payload from request_value
    client.publish(i)

def on_message(client, userdata, message):
  if message.topic == "similarity/sensors/system/temperature":
    //update temperature
  elif message.topic == "similarity/sensors/system/gyroscope":
    //update gyro
  elif message.topic == "similarity/sensors/system/acceleration":
    //update accel

client.on_connect = on_connect
client.on_message = on_message
client.connect("BrokerIP")
client.loop_start()

如果需要,您还可以再次 运行 发布循环(因为看起来您一次只请求 60 秒的数据)。将 request_listrequest_value 数据结构合并到一个列表中可能会更好。