如何使用 Flask/python 在特定时间间隔内记录物联网数据

How to Log IoT Data in specific time interval using Flask/python

这是我的第一个问题,如有错误请见谅。我正在尝试使用 python/flask 开发软件,它不断从多个设备接收物联网数据,并在特定时间间隔记录数据。例如,我有 3 个设备并以 30 秒的间隔记录数据:device1、device2、device3 分别在 5:04:20、5:04:29、5;04:31 发送第一个数据。然后这些设备每隔 1 或 2 秒连续发送数据,我想跟踪上一个数据并确保下一个数据分别在 5:04:50、5:04:59、5:05:01 之后更新在 5:05:20 等等。

我已经编写了一个脚本来确保使用线程的单个设备做到这一点: 这是代码:

import paho.mqtt.client as mqtt
import csv
import os
import datetime
import threading
import queue
import time
q = queue.Queue()
header = ["Date", "Time", "Real_Speed"]
Rspd_key_1 = "key1="
Rspd_key_2 = "key2="
state = 0
message = ""
values = {"Date": 0, "Time": 0, "Real_Speed": 0}
writeFlag = True
logTime = 0
locallog = 0
nowTime = 0
dataUpdated = False
F_checkTime = True
prev_spd = 9999



def checkTime():
    global logTime
    global locallog
    global values
    global dataUpdated
    timesDataMissed = 0
    while (F_checkTime):
        nowTime = time.time()
        if(logTime != 0 and nowTime-logTime >= 30):
            values['Date'] = datetime.date.today().strftime("%d/%m/%Y")
            now = datetime.datetime.now()
            values['Time'] = now.strftime("%H:%M:%S")
            if(dataUpdated):
                q.put(values)
                logTime +=30
                dataUpdated = False
                print(values)
                timesDataMissed=0
            else:
                values['Real_Speed'] = 'NULL'
                q.put(values)
                logTime = nowTime
                dataUpdated = False
                timesDataMissed += 1
                print(values)
                if(timesDataMissed > 10):
                    timesDataMissed = 0
                    logTime = 0


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("something")


def write_csv():
    csvfile = open('InverterDataLogger01.csv', mode='w',
                   newline='',  encoding='utf-8')
    spamwriter = csv.DictWriter(csvfile, fieldnames=header)
    spamwriter.writeheader()
    csvfile.close()
    while writeFlag:
        # print("worker running ",csv_flag)
        time.sleep(0.01)
        # time.sleep(2)
        while not q.empty():
            results = q.get()
            if results is None:
                continue
            csvfile = open('InverterDataLogger01.csv', mode='a',
                           newline='',  encoding='utf-8')
            spamwriter = csv.DictWriter(csvfile, fieldnames=header)
            spamwriter.writerow(results)
            csvfile.close()
            print("Written in csv File")



def find_spd_val(message):
    Do Something
    return realspd


def on_message(client, userdata, msg):
    message = str(msg.payload.decode("utf-8", "ignore"))
    topic = str(msg.topic)
    global values
    global dataUpdated
    global r_index
    global prev_spd
    global rspd
    global locallog
    if(logTime==0):
        global logTime
        logTime = time.time()
        locallog=logTime

    else:
        try:
            rspd = int(find_spd_val(message))
        except:
            pass
        if(prev_spd == 9999):
            prev_spd = rspd
        else:
            values['Real_Speed'] = rspd



def on_publish(client, userdata, mid):
    print("Message Published")


client = mqtt.Client("hidden")
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.connect("hidden")
client.loop_start()
t1 = threading.Thread(target=write_csv)  # start logger
t2 = threading.Thread(target=checkTime)  # start logger

t1.start()  # start logging thread
t2.start()  # start logging thread

print('written')
try:
    while True:
        time.sleep(1)
        pass
except:
    print("interrrupted by keyboard")
    client.loop_stop()  # start loop
    writeFlag = False  # stop logging thread
    F_checkTime = False
    time.sleep(5)

我想用 python/flask 做同样的事情来处理多个设备。我是 Flask 的新手,能否请您给我一些指导,我怎样才能确保 Flask 中的这个功能,我应该使用什么技术?

我认为要正确回答您的问题需要更多上下文。但简单地说,您的设备可以发出 http 请求吗?如果可能的话,您可以创建一个 flask 网络应用程序来接收 http 调用并存储信息。我还看到你提到了 csv,这不是存储数据的好方法,因为依赖 read/write 文件不是一个好习惯。我建议使用适当的数据库(例如 mysql 等)以事务方式存储信息。

我将提出一个简单的解决方案,避免需要 运行 任何计时器,代价是延迟写入文件一秒钟左右(这是否是一个问题取决于您的要求).

来自设备的数据可以存储在类似于(一个非常粗略的示例!)的结构中:

from datetime import datetime

dev_info = { 
            'sensor1': {'last_value': .310, 'last_receive': datetime(2021, 8, 28, 12, 8, 1, 1234), 'last_write': datetime(2021, 8, 28, 12, 8, 0, 541)},
            'sensor2': {'last_value': 5.2, 'last_receive': datetime(2021, 8, 28, 12, 7, 59, 1234), 'last_write': datetime(2021, 8, 28, 12, 7, 58, 921)}
           }

每次收到新样本时(您可以为此使用单个订阅并通过检查 on_message 中的消息主题来确定消息来自哪个设备):

  1. 从结构中检索设备的 last_write 时间
  2. 如果它超过所需的时间间隔,则将 last_value 写入您的 CSV(使用时间戳 last_write + 时间间隔)并更新 last_write(需要一些逻辑在这里;考虑一下如果一分钟内没有收到任何信息会发生什么。
  3. 更新结构中设备的信息 (last_value / last_receive)。

正如我之前提到的那样,这样做的缺点是只有在您收到超出所需时间的新值后才会写出该值window;然而,对于许多用例来说,这很好,并且比使用计时器要简单得多。如果您需要更频繁的写入,那么您可以定期扫描结构中的旧数据并将其写出。

您可能还需要考虑其他一些因素:

  • MQTT 不保证实时交付(尤其是在 QOS 1+ 时)。
  • 物联网单元的通信通常不稳定,因此使用 QOS1+(和 clean_session=False)值得考虑。
  • 基于以上所述,您可能需要考虑在消息中嵌入时间戳(但这确实需要保持远程设备时钟同步)。
  • 存储很便宜 - 您确定存储所有接收到的数据然后再进行下采样没有任何好处吗?