用替换字段值查询后将系列转储回 InfluxDB
Dump series back into InfluxDB after querying with replaced field value
场景
我想通过查询来自 InfluxDB 的测量结果将数据发送到 MQTT Broker(云)。
我在架构中有一个 字段 ,称为 status
。它可以是 1
或 0
。 status=0
表示该系列还没有发送到云端。如果我从 MQTT Broker 得到确认,那么我希望使用 status=1
.
将查询重写回数据库
如FAQs for InfluxDB regarding Duplicate data中所述,如果信息具有与先前查询相同的时间戳但具有不同的字段值=>则将显示更新字段。
为了对此进行测试,我创建了以下内容:
CREATE DATABASE dummy
USE dummy
INSERT meas_1, type=t1, status=0,value=123 1536157064275338300
查询:
SELECT * FROM meas_1
提供
time status type value
1536157064275338300 0 t1 234
现在,如果我想覆盖系列,我会执行以下操作:
INSERT meas_1, type=t1, status=1,value=123 1536157064275338300
这将覆盖系列
time status type value
1536157064275338300 1 t1 234
(注意:通过 Tags 当前在 InfluxDB 中这是不可能的)
用法
- 使用客户端
"status"=0
查询一些信息。
- 重构JSON发送到云端
- 将信息发送到云端
- 如果成功,则将第 1 步的输出写回数据库,但使用
status=1
。
我正在使用 InfluxDBClient Python3
创建应用程序 (MQTT + InfluxDB)
在 write_points
API 中有一个参数提到 batch_size
需要 int
作为输入。
我不确定如何将它用于我想要的应用程序。有人可以用这个或数据库的架构指导我,以便我可以将实际和非冗余信息上传到云吗?
batch_size
实际上是需要传递给 write_points
的测量列表的长度。
步骤
创建客户端并从测量中查询(这里我们查询gps信息)
client = InfluxDBClient(database='dummy')
op = client.query('SELECT * FROM gps WHERE "status"=0', epoch='ns')
将ResultSet
放入列表中:
batch = list(op.get_points('gps'))
为更新创建一个空列表
updated_batch = []
解析每个测量并将 status
标志更改为 1
。注意,InfluxDB 中的默认值是 float
for each in batch:
new_mes = {
'measurement': 'gps',
'tags': {
'type': 'gps'
},
'time': each['time'],
'fields': {
'lat': float(each['lat']),
'lon': float(each['lon']),
'alt': float(each['alt']),
'status': float(1)
}
}
updated_batch.append(new_mes)
最后通过客户端将点数转回batch_size
作为updated_batch
的长度
client.write_points(updated_batch, batch_size=len(updated_batch))
这会覆盖系列,因为它包含相同的时间戳,status
字段设置为 1
场景
我想通过查询来自 InfluxDB 的测量结果将数据发送到 MQTT Broker(云)。
我在架构中有一个 字段 ,称为 status
。它可以是 1
或 0
。 status=0
表示该系列还没有发送到云端。如果我从 MQTT Broker 得到确认,那么我希望使用 status=1
.
如FAQs for InfluxDB regarding Duplicate data中所述,如果信息具有与先前查询相同的时间戳但具有不同的字段值=>则将显示更新字段。
为了对此进行测试,我创建了以下内容:
CREATE DATABASE dummy
USE dummy
INSERT meas_1, type=t1, status=0,value=123 1536157064275338300
查询:
SELECT * FROM meas_1
提供
time status type value
1536157064275338300 0 t1 234
现在,如果我想覆盖系列,我会执行以下操作:
INSERT meas_1, type=t1, status=1,value=123 1536157064275338300
这将覆盖系列
time status type value
1536157064275338300 1 t1 234
(注意:通过 Tags 当前在 InfluxDB 中这是不可能的)
用法
- 使用客户端
"status"=0
查询一些信息。 - 重构JSON发送到云端
- 将信息发送到云端
- 如果成功,则将第 1 步的输出写回数据库,但使用
status=1
。
我正在使用 InfluxDBClient Python3
创建应用程序 (MQTT + InfluxDB)
在 write_points
API 中有一个参数提到 batch_size
需要 int
作为输入。
我不确定如何将它用于我想要的应用程序。有人可以用这个或数据库的架构指导我,以便我可以将实际和非冗余信息上传到云吗?
batch_size
实际上是需要传递给 write_points
的测量列表的长度。
步骤
创建客户端并从测量中查询(这里我们查询gps信息)
client = InfluxDBClient(database='dummy') op = client.query('SELECT * FROM gps WHERE "status"=0', epoch='ns')
将
ResultSet
放入列表中:batch = list(op.get_points('gps'))
为更新创建一个空列表
updated_batch = []
解析每个测量并将
status
标志更改为1
。注意,InfluxDB 中的默认值是 floatfor each in batch: new_mes = { 'measurement': 'gps', 'tags': { 'type': 'gps' }, 'time': each['time'], 'fields': { 'lat': float(each['lat']), 'lon': float(each['lon']), 'alt': float(each['alt']), 'status': float(1) } } updated_batch.append(new_mes)
最后通过客户端将点数转回
的长度batch_size
作为updated_batch
client.write_points(updated_batch, batch_size=len(updated_batch))
这会覆盖系列,因为它包含相同的时间戳,status
字段设置为 1