重新同步 InfluxDB 值的时间戳

Re-synchronizing timestamps of InfluxDB values

我有几个值被放入不同主题下的 InfluxDB 中。它们通过 MQTT/JSON 提供并由 Telegraf 处理。每条 JSON 记录都会生成不同系列条目的元组,这些条目具有 稍微 不同的时间戳。 Δ 通常低于 1 毫秒,而 JSON 事件相隔几秒,因此检测应该是可管理的。

为了进一步分析,有必要对条目进行重新分组,以便再次合并元组。在大多数情况下,简单地四舍五入时间戳就可以了,但当然不是一般情况下,因为边界可能就在这样的元组之间。

有什么想法吗?我正在通过 Python 访问数据,因此在 Python 中进行适当的 Influx 查询或处理都可以。当然,我可以循环扫描数据并决定如何将它们放在一起,但我想知道手头是否有一个优雅的解决方案,也许可以使用常见的 Python 库之一,例如 NumPy 或 Pandas。我想我不是唯一遇到这种问题的人。

您可以使用 Flux 处理数据,并在时间戳上使用日期命令将其截断到不同的粒度,例如 date.minute() 或 date.millisecond()。

也许创建一个新字段,例如 newg,然后使用 set 将 _time 字段替换为它,或者调整现有查询以使用新列?

|> map(fn: (r) => ({ r with newg: date.millisecond(t: r._time) }))

好的,我选择在 Python 中处理我的数据。这就是我想出的。它可以处理任意粒度间隔。对于我正在处理的数据量,它足够快;除了排序,它是 O(n)。

# input is res: dictionary of {timestamp: (topic, value)}
DELTA = timedelta(seconds=0.5) # granularity; adjust to needs
t = datetime(dt.MINYEAR, 1, 1, tzinfo=dt.timezone.utc)  # init running tstamp var
t_group = t # timestamp for sample group
outlist = [] # output list
group = None # current sample group
for key, (topic, val) in sorted(res.items()):
  if (key - t) > DELTA: # new group
    t_group = key
    if group: outlist.append(group) # save previous group to output list
    group = {"time": t_group} # init new group with 1st timestamp
  print(f"{t_group}\t{key}\t{topic} = {val}\t∆={key-t}")
  group[topic] = val # add to group
  t = key
print(f"\n{len(outlist)} entries extracted.\n")