如何使用 Flask/python 在特定时间间隔内记录物联网数据
How to Log IoT Data in specific time interval using Flask/python
这是我的第一个问题,如有错误请见谅。我正在尝试使用 python/flask 开发软件,它不断从多个设备接收物联网数据,并在特定时间间隔记录数据。例如,我有 3 个设备并以 30 秒的间隔记录数据:device1、device2、device3 分别在 5:04:20、5:04:29、5;04:31 发送第一个数据。然后这些设备每隔 1 或 2 秒连续发送数据,我想跟踪上一个数据并确保下一个数据分别在 5:04:50、5:04:59、5:05:01 之后更新在 5:05:20 等等。
我已经编写了一个脚本来确保使用线程的单个设备做到这一点:
这是代码:
import paho.mqtt.client as mqtt
import csv
import os
import datetime
import threading
import queue
import time
q = queue.Queue()
header = ["Date", "Time", "Real_Speed"]
Rspd_key_1 = "key1="
Rspd_key_2 = "key2="
state = 0
message = ""
values = {"Date": 0, "Time": 0, "Real_Speed": 0}
writeFlag = True
logTime = 0
locallog = 0
nowTime = 0
dataUpdated = False
F_checkTime = True
prev_spd = 9999
def checkTime():
global logTime
global locallog
global values
global dataUpdated
timesDataMissed = 0
while (F_checkTime):
nowTime = time.time()
if(logTime != 0 and nowTime-logTime >= 30):
values['Date'] = datetime.date.today().strftime("%d/%m/%Y")
now = datetime.datetime.now()
values['Time'] = now.strftime("%H:%M:%S")
if(dataUpdated):
q.put(values)
logTime +=30
dataUpdated = False
print(values)
timesDataMissed=0
else:
values['Real_Speed'] = 'NULL'
q.put(values)
logTime = nowTime
dataUpdated = False
timesDataMissed += 1
print(values)
if(timesDataMissed > 10):
timesDataMissed = 0
logTime = 0
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("something")
def write_csv():
csvfile = open('InverterDataLogger01.csv', mode='w',
newline='', encoding='utf-8')
spamwriter = csv.DictWriter(csvfile, fieldnames=header)
spamwriter.writeheader()
csvfile.close()
while writeFlag:
# print("worker running ",csv_flag)
time.sleep(0.01)
# time.sleep(2)
while not q.empty():
results = q.get()
if results is None:
continue
csvfile = open('InverterDataLogger01.csv', mode='a',
newline='', encoding='utf-8')
spamwriter = csv.DictWriter(csvfile, fieldnames=header)
spamwriter.writerow(results)
csvfile.close()
print("Written in csv File")
def find_spd_val(message):
Do Something
return realspd
def on_message(client, userdata, msg):
message = str(msg.payload.decode("utf-8", "ignore"))
topic = str(msg.topic)
global values
global dataUpdated
global r_index
global prev_spd
global rspd
global locallog
if(logTime==0):
global logTime
logTime = time.time()
locallog=logTime
else:
try:
rspd = int(find_spd_val(message))
except:
pass
if(prev_spd == 9999):
prev_spd = rspd
else:
values['Real_Speed'] = rspd
def on_publish(client, userdata, mid):
print("Message Published")
client = mqtt.Client("hidden")
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.connect("hidden")
client.loop_start()
t1 = threading.Thread(target=write_csv) # start logger
t2 = threading.Thread(target=checkTime) # start logger
t1.start() # start logging thread
t2.start() # start logging thread
print('written')
try:
while True:
time.sleep(1)
pass
except:
print("interrrupted by keyboard")
client.loop_stop() # start loop
writeFlag = False # stop logging thread
F_checkTime = False
time.sleep(5)
我想用 python/flask 做同样的事情来处理多个设备。我是 Flask 的新手,能否请您给我一些指导,我怎样才能确保 Flask 中的这个功能,我应该使用什么技术?
我认为要正确回答您的问题需要更多上下文。但简单地说,您的设备可以发出 http 请求吗?如果可能的话,您可以创建一个 flask 网络应用程序来接收 http 调用并存储信息。我还看到你提到了 csv,这不是存储数据的好方法,因为依赖 read/write 文件不是一个好习惯。我建议使用适当的数据库(例如 mysql 等)以事务方式存储信息。
我将提出一个简单的解决方案,避免需要 运行 任何计时器,代价是延迟写入文件一秒钟左右(这是否是一个问题取决于您的要求).
来自设备的数据可以存储在类似于(一个非常粗略的示例!)的结构中:
from datetime import datetime
dev_info = {
'sensor1': {'last_value': .310, 'last_receive': datetime(2021, 8, 28, 12, 8, 1, 1234), 'last_write': datetime(2021, 8, 28, 12, 8, 0, 541)},
'sensor2': {'last_value': 5.2, 'last_receive': datetime(2021, 8, 28, 12, 7, 59, 1234), 'last_write': datetime(2021, 8, 28, 12, 7, 58, 921)}
}
每次收到新样本时(您可以为此使用单个订阅并通过检查 on_message
中的消息主题来确定消息来自哪个设备):
- 从结构中检索设备的
last_write
时间
- 如果它超过所需的时间间隔,则将
last_value
写入您的 CSV(使用时间戳 last_write
+ 时间间隔)并更新 last_write
(需要一些逻辑在这里;考虑一下如果一分钟内没有收到任何信息会发生什么。
- 更新结构中设备的信息 (
last_value
/ last_receive
)。
正如我之前提到的那样,这样做的缺点是只有在您收到超出所需时间的新值后才会写出该值window;然而,对于许多用例来说,这很好,并且比使用计时器要简单得多。如果您需要更频繁的写入,那么您可以定期扫描结构中的旧数据并将其写出。
您可能还需要考虑其他一些因素:
- MQTT 不保证实时交付(尤其是在 QOS 1+ 时)。
- 物联网单元的通信通常不稳定,因此使用 QOS1+(和
clean_session=False
)值得考虑。
- 基于以上所述,您可能需要考虑在消息中嵌入时间戳(但这确实需要保持远程设备时钟同步)。
- 存储很便宜 - 您确定存储所有接收到的数据然后再进行下采样没有任何好处吗?
这是我的第一个问题,如有错误请见谅。我正在尝试使用 python/flask 开发软件,它不断从多个设备接收物联网数据,并在特定时间间隔记录数据。例如,我有 3 个设备并以 30 秒的间隔记录数据:device1、device2、device3 分别在 5:04:20、5:04:29、5;04:31 发送第一个数据。然后这些设备每隔 1 或 2 秒连续发送数据,我想跟踪上一个数据并确保下一个数据分别在 5:04:50、5:04:59、5:05:01 之后更新在 5:05:20 等等。
我已经编写了一个脚本来确保使用线程的单个设备做到这一点: 这是代码:
import paho.mqtt.client as mqtt
import csv
import os
import datetime
import threading
import queue
import time
q = queue.Queue()
header = ["Date", "Time", "Real_Speed"]
Rspd_key_1 = "key1="
Rspd_key_2 = "key2="
state = 0
message = ""
values = {"Date": 0, "Time": 0, "Real_Speed": 0}
writeFlag = True
logTime = 0
locallog = 0
nowTime = 0
dataUpdated = False
F_checkTime = True
prev_spd = 9999
def checkTime():
global logTime
global locallog
global values
global dataUpdated
timesDataMissed = 0
while (F_checkTime):
nowTime = time.time()
if(logTime != 0 and nowTime-logTime >= 30):
values['Date'] = datetime.date.today().strftime("%d/%m/%Y")
now = datetime.datetime.now()
values['Time'] = now.strftime("%H:%M:%S")
if(dataUpdated):
q.put(values)
logTime +=30
dataUpdated = False
print(values)
timesDataMissed=0
else:
values['Real_Speed'] = 'NULL'
q.put(values)
logTime = nowTime
dataUpdated = False
timesDataMissed += 1
print(values)
if(timesDataMissed > 10):
timesDataMissed = 0
logTime = 0
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("something")
def write_csv():
csvfile = open('InverterDataLogger01.csv', mode='w',
newline='', encoding='utf-8')
spamwriter = csv.DictWriter(csvfile, fieldnames=header)
spamwriter.writeheader()
csvfile.close()
while writeFlag:
# print("worker running ",csv_flag)
time.sleep(0.01)
# time.sleep(2)
while not q.empty():
results = q.get()
if results is None:
continue
csvfile = open('InverterDataLogger01.csv', mode='a',
newline='', encoding='utf-8')
spamwriter = csv.DictWriter(csvfile, fieldnames=header)
spamwriter.writerow(results)
csvfile.close()
print("Written in csv File")
def find_spd_val(message):
Do Something
return realspd
def on_message(client, userdata, msg):
message = str(msg.payload.decode("utf-8", "ignore"))
topic = str(msg.topic)
global values
global dataUpdated
global r_index
global prev_spd
global rspd
global locallog
if(logTime==0):
global logTime
logTime = time.time()
locallog=logTime
else:
try:
rspd = int(find_spd_val(message))
except:
pass
if(prev_spd == 9999):
prev_spd = rspd
else:
values['Real_Speed'] = rspd
def on_publish(client, userdata, mid):
print("Message Published")
client = mqtt.Client("hidden")
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.connect("hidden")
client.loop_start()
t1 = threading.Thread(target=write_csv) # start logger
t2 = threading.Thread(target=checkTime) # start logger
t1.start() # start logging thread
t2.start() # start logging thread
print('written')
try:
while True:
time.sleep(1)
pass
except:
print("interrrupted by keyboard")
client.loop_stop() # start loop
writeFlag = False # stop logging thread
F_checkTime = False
time.sleep(5)
我想用 python/flask 做同样的事情来处理多个设备。我是 Flask 的新手,能否请您给我一些指导,我怎样才能确保 Flask 中的这个功能,我应该使用什么技术?
我认为要正确回答您的问题需要更多上下文。但简单地说,您的设备可以发出 http 请求吗?如果可能的话,您可以创建一个 flask 网络应用程序来接收 http 调用并存储信息。我还看到你提到了 csv,这不是存储数据的好方法,因为依赖 read/write 文件不是一个好习惯。我建议使用适当的数据库(例如 mysql 等)以事务方式存储信息。
我将提出一个简单的解决方案,避免需要 运行 任何计时器,代价是延迟写入文件一秒钟左右(这是否是一个问题取决于您的要求).
来自设备的数据可以存储在类似于(一个非常粗略的示例!)的结构中:
from datetime import datetime
dev_info = {
'sensor1': {'last_value': .310, 'last_receive': datetime(2021, 8, 28, 12, 8, 1, 1234), 'last_write': datetime(2021, 8, 28, 12, 8, 0, 541)},
'sensor2': {'last_value': 5.2, 'last_receive': datetime(2021, 8, 28, 12, 7, 59, 1234), 'last_write': datetime(2021, 8, 28, 12, 7, 58, 921)}
}
每次收到新样本时(您可以为此使用单个订阅并通过检查 on_message
中的消息主题来确定消息来自哪个设备):
- 从结构中检索设备的
last_write
时间 - 如果它超过所需的时间间隔,则将
last_value
写入您的 CSV(使用时间戳last_write
+ 时间间隔)并更新last_write
(需要一些逻辑在这里;考虑一下如果一分钟内没有收到任何信息会发生什么。 - 更新结构中设备的信息 (
last_value
/last_receive
)。
正如我之前提到的那样,这样做的缺点是只有在您收到超出所需时间的新值后才会写出该值window;然而,对于许多用例来说,这很好,并且比使用计时器要简单得多。如果您需要更频繁的写入,那么您可以定期扫描结构中的旧数据并将其写出。
您可能还需要考虑其他一些因素:
- MQTT 不保证实时交付(尤其是在 QOS 1+ 时)。
- 物联网单元的通信通常不稳定,因此使用 QOS1+(和
clean_session=False
)值得考虑。 - 基于以上所述,您可能需要考虑在消息中嵌入时间戳(但这确实需要保持远程设备时钟同步)。
- 存储很便宜 - 您确定存储所有接收到的数据然后再进行下采样没有任何好处吗?