如何作为 mqtt 客户端累积消息 1 秒,然后将其保存到文件中
How to accumulate messages as mqtt client for 1 second, then save it to a file
我的问题如下:
我编写了一个订阅主题的程序,其中 2 个具有一个键的字典分别每秒到达更多次。在每条消息上,他们都会改变自己的价值。
我将这些词典保存在一个名为“Status”的大型缓冲词典中。我需要的是每秒将 Status 的“快照”保存到文件中。
我试过 time.sleep(1) 但它漂移了。由于已经存在的客户端循环,我不知道如何按计划处理问题...
我是 python 和 mqtt 的新手,非常感谢您的帮助
我的代码:
import paho.mqtt.client as mqtt
import time
import json
Status = {}
#create client instance
client = mqtt.Client(client_id=None, clean_session=True, transport="tcp")
#connect to broker
client.connect("my_broker", 1883)
#use subscribe() to subscribe to a topic and receive messages
client.subscribe("topic/#", qos=0)
def test1_callback(client, userdata, msg):
msg_dict = json.loads((msg.payload))
Status.update(msg_dict)
client.message_callback_add("topic/test1", test1_callback)
while True:
client.loop_start()
time.sleep(1)
client.loop_stop()
with open('Data.txt', 'a+') as file:
t = time.localtime()
Status["time"]= time.strftime("%H:%M:%S", t)
file.write(str(Status["time"]) + " ")
file.write(str(Status["key1"]) + " ")
file.write(str(Status["key2"]) + " ")
client.loop_start()
与其手动停止网络线程,不如使用每秒触发一次的计时器。此外,在将数据存储到文件时锁定数据可能是个好主意 - 否则可能会在两者之间发生更新:
# ...
import threading
def test1_callback(client, userdata, msg):
msg_dict = json.loads((msg.payload))
lock.acquire()
Status.update(msg_dict)
lock.release()
def timer_event():
lock.acquire()
# save to file here
lock.release()
# restart timer
threading.Timer(1, timer_event).start()
Status = {}
lock = threading.Lock()
# client initialization
# ...
client.loop_start()
threading.Timer(1, timer_event).start()
while True:
pass
但这不会阻止您的储值漂移,因为该主题显然发布得太频繁,以至于您的订阅者(甚至经纪人)无法足够快地处理消息。
因此您可能希望缩短发布此主题的时间间隔。另请注意,您订阅了一个 multi-level 主题 - 即使 "topic/test1"
之外的主题未在您的代码中处理,它们仍然会给代理和订阅客户端造成负载
我的问题如下: 我编写了一个订阅主题的程序,其中 2 个具有一个键的字典分别每秒到达更多次。在每条消息上,他们都会改变自己的价值。 我将这些词典保存在一个名为“Status”的大型缓冲词典中。我需要的是每秒将 Status 的“快照”保存到文件中。
我试过 time.sleep(1) 但它漂移了。由于已经存在的客户端循环,我不知道如何按计划处理问题...
我是 python 和 mqtt 的新手,非常感谢您的帮助
我的代码:
import paho.mqtt.client as mqtt
import time
import json
Status = {}
#create client instance
client = mqtt.Client(client_id=None, clean_session=True, transport="tcp")
#connect to broker
client.connect("my_broker", 1883)
#use subscribe() to subscribe to a topic and receive messages
client.subscribe("topic/#", qos=0)
def test1_callback(client, userdata, msg):
msg_dict = json.loads((msg.payload))
Status.update(msg_dict)
client.message_callback_add("topic/test1", test1_callback)
while True:
client.loop_start()
time.sleep(1)
client.loop_stop()
with open('Data.txt', 'a+') as file:
t = time.localtime()
Status["time"]= time.strftime("%H:%M:%S", t)
file.write(str(Status["time"]) + " ")
file.write(str(Status["key1"]) + " ")
file.write(str(Status["key2"]) + " ")
client.loop_start()
与其手动停止网络线程,不如使用每秒触发一次的计时器。此外,在将数据存储到文件时锁定数据可能是个好主意 - 否则可能会在两者之间发生更新:
# ...
import threading
def test1_callback(client, userdata, msg):
msg_dict = json.loads((msg.payload))
lock.acquire()
Status.update(msg_dict)
lock.release()
def timer_event():
lock.acquire()
# save to file here
lock.release()
# restart timer
threading.Timer(1, timer_event).start()
Status = {}
lock = threading.Lock()
# client initialization
# ...
client.loop_start()
threading.Timer(1, timer_event).start()
while True:
pass
但这不会阻止您的储值漂移,因为该主题显然发布得太频繁,以至于您的订阅者(甚至经纪人)无法足够快地处理消息。
因此您可能希望缩短发布此主题的时间间隔。另请注意,您订阅了一个 multi-level 主题 - 即使 "topic/test1"
之外的主题未在您的代码中处理,它们仍然会给代理和订阅客户端造成负载