Python SQLAlchemy 结构和平均数据检索
Python SQLAlchemy structure and averaged data retrieval
我有一个使用 Flask-sqlalchemy 的数据库,我想存储和检索传感器的(平均)温度和湿度数据。
我在下面写了 class 将两者存储在一起,因为我认为它是一个传感器并且这些值对于它们被测量的相同时间步长有效。
我的问题是,我怎样才能做到这一点?
具体我发现,有些百分位的数据检索需要很长时间~5s。我知道它的编程效率很低——我该如何有意识地改进它?我唯一真正需要的是一个函数,用于检索某个时间范围内的测量值或按我放入它们的顺序检索测量值,这与移动平均线或分块平均线等效。
大部分时间消耗在:
TempHum.query.filter(TempHum.timestamp>开始时间)
主要关注点是:如何有效地构造数据库以及如何有效地检索(和过滤)数据。
#!/usr/bin/python3
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timedelta
from math import floor
db = SQLAlchemy()
# Define a Table in the database
class TempHum(db.Model):
__tablename__ = 'tempHumMeasurements'
# Here we define columns for the table
# Each column is also a Python attribute.
id = db.Column(db.Integer, primary_key = True)
timestamp = db.Column(db.DateTime(timezone=True))
temperature = db.Column('temperature', db.Float)
humidity = db.Column('humidity', db.Float)
location = db.Column('location', db.String(32))
def __init__(self, **kwargs):
super(TempHum, self).__init__(**kwargs)
# do custom initialization here
#now.isoformat(timespec="seconds")
self.timestamp = datetime.now().replace(microsecond=0)
self.location = 'room1'
def __repr__(self):
return "<TempHum {}>".format(self.timestamp.strftime("%d %m %Y, %H:%M"))
def getNSamples(n):
return TempHum.query.\
order_by(TempHum.timestamp.desc()).\
limit(n).\
from_self().order_by(TempHum.timestamp.asc()).all()
def getSampledNDays(n):
starttime = datetime.now() - timedelta(days=n)
meas = TempHum.query.\
filter(TempHum.timestamp>starttime).\
order_by(TempHum.timestamp.asc()).all()
return meas1
# ___________
# helper functions
def mean(lst):
return sum(lst)/len(lst)
def averageData(measures):
mid = floor(len(measures)/2)
meantemp = TempHum.mean([tmp.temperature for tmp in measures])
meanhum = TempHum.mean([tmp.humidity for tmp in measures])
means = TempHum(temperature=meantemp,humidity=meanhum)
means.timestamp = measures[mid].timestamp
return means
def downsample(arr,average_mins):
# if values have the correct sampling time they are averaged
# over average_count values, otherwise all data is taken into account
# arr is an array of TempHum objects that should be downsampled
average_time = timedelta(minutes=average_mins)
buffer = []
output_array = []
for k,date in enumerate(arr):
if not buffer: # buffer first one if buffer is empty
buffer.append(arr[k])
continue
timestep = arr[k].timestamp-buffer[0].timestamp
if timestep<average_time:
# values within the window
buffer.append(arr[k])
else:
averaged = TempHum.averageData(buffer)
output_array.append(averaged)
buffer.clear()
buffer.append(arr[k])
return output_array;
class HeaterState(db.Model):
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime(timezone=True))
heating = db.Column('heating', db.Boolean)
def __init__(self, **kwargs):
super(HeaterState, self).__init__(**kwargs)
self.timestamp = datetime.now().replace(microsecond=0)
def __repr__(self):
return "<HeaterState {}>".format(self.timestamp.strftime("%d %m %Y, %H:%M"))
def getSampledNDays(n):
starttime = datetime.now() - timedelta(days=n)
meas = HeaterState.query.\
filter(HeaterState.timestamp>starttime).\
order_by(HeaterState.timestamp.asc()).all()
return meas
您是否尝试过在 timestamp
列上添加索引?
像这样:
class TempHum(db.Model):
...
timestamp = db.Column(db.DateTime(timezone=True), db_index=True)
好吧,我之前尝试过正确的方法,但是我被奇怪的效果所困扰并且调试得不够好。
我发现最有效的查询是:
meas = TempHum.query.order_by(TempHum.id.desc()).limit(n).all()
这大大减少了我的 4320 个样本(以分钟计 3 天)查询,现在需要 1.8 秒(与之前 >4.7 秒相比)。
当我只对温度或湿度进行平均时,平均又 0.1 秒消失了,而不是两者。
总的来说我还不是很满意,不过最大的失误好像是那两个。
我之前尝试过的这个查询的问题是我对 timedelta 的处理。我现在用
if abs(timediff) < average_time:
这在这里更有意义并且允许处理反向列表。
索引tipp还是很有价值的,值得考虑。但是,我认为 id 是自动索引的,所以在这里,一切都很好。
我有一个使用 Flask-sqlalchemy 的数据库,我想存储和检索传感器的(平均)温度和湿度数据。 我在下面写了 class 将两者存储在一起,因为我认为它是一个传感器并且这些值对于它们被测量的相同时间步长有效。
我的问题是,我怎样才能做到这一点?
具体我发现,有些百分位的数据检索需要很长时间~5s。我知道它的编程效率很低——我该如何有意识地改进它?我唯一真正需要的是一个函数,用于检索某个时间范围内的测量值或按我放入它们的顺序检索测量值,这与移动平均线或分块平均线等效。 大部分时间消耗在: TempHum.query.filter(TempHum.timestamp>开始时间)
主要关注点是:如何有效地构造数据库以及如何有效地检索(和过滤)数据。
#!/usr/bin/python3
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timedelta
from math import floor
db = SQLAlchemy()
# Define a Table in the database
class TempHum(db.Model):
__tablename__ = 'tempHumMeasurements'
# Here we define columns for the table
# Each column is also a Python attribute.
id = db.Column(db.Integer, primary_key = True)
timestamp = db.Column(db.DateTime(timezone=True))
temperature = db.Column('temperature', db.Float)
humidity = db.Column('humidity', db.Float)
location = db.Column('location', db.String(32))
def __init__(self, **kwargs):
super(TempHum, self).__init__(**kwargs)
# do custom initialization here
#now.isoformat(timespec="seconds")
self.timestamp = datetime.now().replace(microsecond=0)
self.location = 'room1'
def __repr__(self):
return "<TempHum {}>".format(self.timestamp.strftime("%d %m %Y, %H:%M"))
def getNSamples(n):
return TempHum.query.\
order_by(TempHum.timestamp.desc()).\
limit(n).\
from_self().order_by(TempHum.timestamp.asc()).all()
def getSampledNDays(n):
starttime = datetime.now() - timedelta(days=n)
meas = TempHum.query.\
filter(TempHum.timestamp>starttime).\
order_by(TempHum.timestamp.asc()).all()
return meas1
# ___________
# helper functions
def mean(lst):
return sum(lst)/len(lst)
def averageData(measures):
mid = floor(len(measures)/2)
meantemp = TempHum.mean([tmp.temperature for tmp in measures])
meanhum = TempHum.mean([tmp.humidity for tmp in measures])
means = TempHum(temperature=meantemp,humidity=meanhum)
means.timestamp = measures[mid].timestamp
return means
def downsample(arr,average_mins):
# if values have the correct sampling time they are averaged
# over average_count values, otherwise all data is taken into account
# arr is an array of TempHum objects that should be downsampled
average_time = timedelta(minutes=average_mins)
buffer = []
output_array = []
for k,date in enumerate(arr):
if not buffer: # buffer first one if buffer is empty
buffer.append(arr[k])
continue
timestep = arr[k].timestamp-buffer[0].timestamp
if timestep<average_time:
# values within the window
buffer.append(arr[k])
else:
averaged = TempHum.averageData(buffer)
output_array.append(averaged)
buffer.clear()
buffer.append(arr[k])
return output_array;
class HeaterState(db.Model):
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime(timezone=True))
heating = db.Column('heating', db.Boolean)
def __init__(self, **kwargs):
super(HeaterState, self).__init__(**kwargs)
self.timestamp = datetime.now().replace(microsecond=0)
def __repr__(self):
return "<HeaterState {}>".format(self.timestamp.strftime("%d %m %Y, %H:%M"))
def getSampledNDays(n):
starttime = datetime.now() - timedelta(days=n)
meas = HeaterState.query.\
filter(HeaterState.timestamp>starttime).\
order_by(HeaterState.timestamp.asc()).all()
return meas
您是否尝试过在 timestamp
列上添加索引?
像这样:
class TempHum(db.Model):
...
timestamp = db.Column(db.DateTime(timezone=True), db_index=True)
好吧,我之前尝试过正确的方法,但是我被奇怪的效果所困扰并且调试得不够好。 我发现最有效的查询是:
meas = TempHum.query.order_by(TempHum.id.desc()).limit(n).all()
这大大减少了我的 4320 个样本(以分钟计 3 天)查询,现在需要 1.8 秒(与之前 >4.7 秒相比)。 当我只对温度或湿度进行平均时,平均又 0.1 秒消失了,而不是两者。
总的来说我还不是很满意,不过最大的失误好像是那两个。
我之前尝试过的这个查询的问题是我对 timedelta 的处理。我现在用
if abs(timediff) < average_time:
这在这里更有意义并且允许处理反向列表。
索引tipp还是很有价值的,值得考虑。但是,我认为 id 是自动索引的,所以在这里,一切都很好。