在 mqtt paho 回调函数中发出 运行 个 self 对象
Issue running self objects in mqtt paho callback function
我正在尝试编写一个脚本来保存 mqtt 数据并将其发送到 influxDB。我遇到的问题是 mqtt-paho 模块的回调函数一直报错:
AttributeError: 'Client' object has no attribute 'write_api'
。我认为这是因为 mqtt-paho 内部 'Client' class 中的 self
。我的完整脚本可以在下面找到:
# Imported modules
# standard time module
from datetime import datetime
import time
# InfluxDB specific modules
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
#MQTT paho specific modules
import paho.mqtt.client as mqtt
class data_handler(): # Default namespaces are just for all the ESPs.
def __init__(self, namespace_list=["ESP01","ESP02","ESP03","ESP04","ESP05","ESP06","ESP07","ESP08"]):
# initialize influxdb client and define access token and data bucket
token = "XXXXXXXXXX" # robotlab's token
self.org = "Home"
self.bucket = "HomeSensors"
self.flux_client = InfluxDBClient(url="http://localhost:8086", token=token)
self.write_api = self.flux_client.write_api(write_options=SYNCHRONOUS)
# Initialize and establish connection to MQTT broker
broker_address="XXX.XXX.XXX.XXX"
self.mqtt_client = mqtt.Client("influx_client") #create new instance
self.mqtt_client.on_message=data_handler.mqtt_message #attach function to callback
self.mqtt_client.connect(broker_address) #connect to broker
# Define list of namespaces
self.namespace_list = namespace_list
print(self.namespace_list)
def mqtt_message(self, client, message):
print("message received " ,str(message.payload.decode("utf-8")))
print("message topic=",message.topic)
print("message qos=",message.qos)
print("message retain flag=",message.retain)
sequence = [message.topic, message.payload.decode("utf-8")]
self.write_api.write(self.bucket, self.org, sequence)
def mqtt_listener(self):
for namespace in self.namespace_list:
self.mqtt_client.loop_start() #start the loop
print("Subscribing to topics!")
message = namespace+"/#"
self.mqtt_client.subscribe(message, 0)
time.sleep(4) # wait
self.mqtt_client.loop_stop() #stop the loop
def main():
influxHandler = data_handler(["ESP07"])
influxHandler.mqtt_listener()
if __name__ == '__main__':
main()
代码工作正常,直到我在回调函数中添加 self.someVariable
。解决这个问题的好方法是什么?我真的不想制作全局变量,因此我选择使用 class.
提前致谢!
当涉及多个 class 时处理 self
可能会让人感到困惑。 paho 库调用 on_message
as follows:
on_message(self, self._userdata, message)
所以传递的第一个参数是 Client
的实例,所以你看到的是预期的(在没有任何 classes 的情况下)。
如果回调是 method object(这似乎是您的目标)“实例对象作为函数的第一个参数传递”。这意味着您的函数将采用四个参数并且定义为:
mqtt_message(self, client, userdata, msg)
基于此,您可能希望您的应用程序比实际更早失败,但让我们看看您是如何设置回调的:
self.mqtt_client.on_message=data_handler.mqtt_message
datahandler
是 class 本身,而不是 class 的实例。这意味着您有效地将回调设置为静态函数(不绑定到 class - 的任何实例可能会有所帮助)。您需要将其更改为:
self.mqtt_client.on_message=self.mqtt_message
然而,这将不起作用,因为该方法目前只接受三个参数;将定义更新为:
def mqtt_message(self, client, userdata, msg)
通过这些更改,我相信这会奏效(或者至少您会发现另一个问题 :-))。
An example 可能是更好的解释方式:
class mqtt_sim():
def __init__(self):
self._on_message = None
@property
def on_message(self):
return self._on_message
@on_message.setter
def on_message(self, func):
self._on_message = func
# This is what you are doing
class data_handler1(): # Default namespaces are just for all the ESPs.
def __init__(self):
self.mqtt = mqtt_sim()
self.mqtt.on_message = data_handler1.mqtt_message # xxxxx
def mqtt_message(self, client, message):
print("mqtt_message1", self, client, message)
# This is what you should be doing
class data_handler2(): # Default namespaces are just for all the ESPs.
def __init__(self):
self.mqtt = mqtt_sim()
self.mqtt.on_message = self.mqtt_message #attach function to callback
def mqtt_message(self, mqttself, client, message):
print("mqtt_message2", self, mqttself, client, message)
# Lets try using both of the above
d = data_handler1()
d.mqtt._on_message("self", "userdata", "message")
d = data_handler2()
d.mqtt._on_message("self", "userdata", "message")
我正在尝试编写一个脚本来保存 mqtt 数据并将其发送到 influxDB。我遇到的问题是 mqtt-paho 模块的回调函数一直报错:
AttributeError: 'Client' object has no attribute 'write_api'
。我认为这是因为 mqtt-paho 内部 'Client' class 中的 self
。我的完整脚本可以在下面找到:
# Imported modules
# standard time module
from datetime import datetime
import time
# InfluxDB specific modules
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
#MQTT paho specific modules
import paho.mqtt.client as mqtt
class data_handler(): # Default namespaces are just for all the ESPs.
def __init__(self, namespace_list=["ESP01","ESP02","ESP03","ESP04","ESP05","ESP06","ESP07","ESP08"]):
# initialize influxdb client and define access token and data bucket
token = "XXXXXXXXXX" # robotlab's token
self.org = "Home"
self.bucket = "HomeSensors"
self.flux_client = InfluxDBClient(url="http://localhost:8086", token=token)
self.write_api = self.flux_client.write_api(write_options=SYNCHRONOUS)
# Initialize and establish connection to MQTT broker
broker_address="XXX.XXX.XXX.XXX"
self.mqtt_client = mqtt.Client("influx_client") #create new instance
self.mqtt_client.on_message=data_handler.mqtt_message #attach function to callback
self.mqtt_client.connect(broker_address) #connect to broker
# Define list of namespaces
self.namespace_list = namespace_list
print(self.namespace_list)
def mqtt_message(self, client, message):
print("message received " ,str(message.payload.decode("utf-8")))
print("message topic=",message.topic)
print("message qos=",message.qos)
print("message retain flag=",message.retain)
sequence = [message.topic, message.payload.decode("utf-8")]
self.write_api.write(self.bucket, self.org, sequence)
def mqtt_listener(self):
for namespace in self.namespace_list:
self.mqtt_client.loop_start() #start the loop
print("Subscribing to topics!")
message = namespace+"/#"
self.mqtt_client.subscribe(message, 0)
time.sleep(4) # wait
self.mqtt_client.loop_stop() #stop the loop
def main():
influxHandler = data_handler(["ESP07"])
influxHandler.mqtt_listener()
if __name__ == '__main__':
main()
代码工作正常,直到我在回调函数中添加 self.someVariable
。解决这个问题的好方法是什么?我真的不想制作全局变量,因此我选择使用 class.
提前致谢!
当涉及多个 class 时处理 self
可能会让人感到困惑。 paho 库调用 on_message
as follows:
on_message(self, self._userdata, message)
所以传递的第一个参数是 Client
的实例,所以你看到的是预期的(在没有任何 classes 的情况下)。
如果回调是 method object(这似乎是您的目标)“实例对象作为函数的第一个参数传递”。这意味着您的函数将采用四个参数并且定义为:
mqtt_message(self, client, userdata, msg)
基于此,您可能希望您的应用程序比实际更早失败,但让我们看看您是如何设置回调的:
self.mqtt_client.on_message=data_handler.mqtt_message
datahandler
是 class 本身,而不是 class 的实例。这意味着您有效地将回调设置为静态函数(不绑定到 class -
self.mqtt_client.on_message=self.mqtt_message
然而,这将不起作用,因为该方法目前只接受三个参数;将定义更新为:
def mqtt_message(self, client, userdata, msg)
通过这些更改,我相信这会奏效(或者至少您会发现另一个问题 :-))。
An example 可能是更好的解释方式:
class mqtt_sim():
def __init__(self):
self._on_message = None
@property
def on_message(self):
return self._on_message
@on_message.setter
def on_message(self, func):
self._on_message = func
# This is what you are doing
class data_handler1(): # Default namespaces are just for all the ESPs.
def __init__(self):
self.mqtt = mqtt_sim()
self.mqtt.on_message = data_handler1.mqtt_message # xxxxx
def mqtt_message(self, client, message):
print("mqtt_message1", self, client, message)
# This is what you should be doing
class data_handler2(): # Default namespaces are just for all the ESPs.
def __init__(self):
self.mqtt = mqtt_sim()
self.mqtt.on_message = self.mqtt_message #attach function to callback
def mqtt_message(self, mqttself, client, message):
print("mqtt_message2", self, mqttself, client, message)
# Lets try using both of the above
d = data_handler1()
d.mqtt._on_message("self", "userdata", "message")
d = data_handler2()
d.mqtt._on_message("self", "userdata", "message")