在 mqtt paho 回调函数中发出 运行 个 self 对象

Issue running self objects in mqtt paho callback function

我正在尝试编写一个脚本来保存 mqtt 数据并将其发送到 influxDB。我遇到的问题是 mqtt-paho 模块的回调函数一直报错: AttributeError: 'Client' object has no attribute 'write_api'。我认为这是因为 mqtt-paho 内部 'Client' class 中的 self。我的完整脚本可以在下面找到:

# Imported modules 
# standard time module
from datetime import datetime
import time
# InfluxDB specific modules 
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

#MQTT paho specific modules
import paho.mqtt.client as mqtt



class data_handler(): # Default namespaces are just for all the ESPs.


    def __init__(self, namespace_list=["ESP01","ESP02","ESP03","ESP04","ESP05","ESP06","ESP07","ESP08"]):
        
        # initialize influxdb client and define access token and data bucket
        token = "XXXXXXXXXX" # robotlab's token
        self.org = "Home"   
        self.bucket = "HomeSensors"
        self.flux_client = InfluxDBClient(url="http://localhost:8086", token=token)
        self.write_api = self.flux_client.write_api(write_options=SYNCHRONOUS)
        
        # Initialize and establish connection to MQTT broker 
        broker_address="XXX.XXX.XXX.XXX"
        self.mqtt_client = mqtt.Client("influx_client") #create new instance
        self.mqtt_client.on_message=data_handler.mqtt_message #attach function to callback
        self.mqtt_client.connect(broker_address) #connect to broker

        # Define list of namespaces
        self.namespace_list = namespace_list
        print(self.namespace_list)


    def mqtt_message(self, client, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)
        
        sequence = [message.topic, message.payload.decode("utf-8")]
        self.write_api.write(self.bucket, self.org, sequence)

    def mqtt_listener(self):

        for namespace in self.namespace_list:
            self.mqtt_client.loop_start() #start the loop
            print("Subscribing to topics!")
            message = namespace+"/#"
            self.mqtt_client.subscribe(message, 0)
            time.sleep(4) # wait
            self.mqtt_client.loop_stop() #stop the loop


def main():
    influxHandler = data_handler(["ESP07"])
    influxHandler.mqtt_listener()

if  __name__ == '__main__':
    main()

代码工作正常,直到我在回调函数中添加 self.someVariable。解决这个问题的好方法是什么?我真的不想制作全局变量,因此我选择使用 class.

提前致谢!

当涉及多个 class 时处理 self 可能会让人感到困惑。 paho 库调用 on_message as follows:

on_message(self, self._userdata, message)

所以传递的第一个参数是 Client 的实例,所以你看到的是预期的(在没有任何 classes 的情况下)。

如果回调是 method object(这似乎是您的目标)“实例对象作为函数的第一个参数传递”。这意味着您的函数将采用四个参数并且定义为:

mqtt_message(self, client, userdata, msg)

基于此,您可能希望您的应用程序比实际更早失败,但让我们看看您是如何设置回调的:

self.mqtt_client.on_message=data_handler.mqtt_message

datahandler 是 class 本身,而不是 class 的实例。这意味着您有效地将回调设置为静态函数(不绑定到 class - 的任何实例可能会有所帮助)。您需要将其更改为:

self.mqtt_client.on_message=self.mqtt_message

然而,这将不起作用,因为该方法目前只接受三个参数;将定义更新为:

def mqtt_message(self, client, userdata, msg)

通过这些更改,我相信这会奏效(或者至少您会发现另一个问题 :-))。

An example 可能是更好的解释方式:

class mqtt_sim():
  def __init__(self):
    self._on_message = None
    
  @property
  def on_message(self):
     return self._on_message
      
  @on_message.setter
  def on_message(self, func):
    self._on_message = func

# This is what you are doing    
class data_handler1(): # Default namespaces are just for all the ESPs.
   def __init__(self):
      self.mqtt = mqtt_sim()      
      self.mqtt.on_message = data_handler1.mqtt_message    # xxxxx         
   def mqtt_message(self, client, message):        
      print("mqtt_message1", self, client, message) 

# This is what you should be doing      
class data_handler2(): # Default namespaces are just for all the ESPs.
   def __init__(self):
      self.mqtt = mqtt_sim()      
      self.mqtt.on_message = self.mqtt_message #attach function to callback             
   def mqtt_message(self, mqttself, client, message):        
      print("mqtt_message2", self, mqttself, client, message)    

# Lets try using both of the above      
d = data_handler1()
d.mqtt._on_message("self", "userdata", "message")

d = data_handler2()
d.mqtt._on_message("self", "userdata", "message")