具有间隙的不规则数据的时间序列 DBMS

Time Series DBMS for irregular data with gaps

是否存在可以填充不规则时间序列中的空缺并对其进行分组的数据库引擎?

功能最强大的系统是 InfluxDB:

> select * from test;
name: test
time value1 value2
---- ------ ------
1    1      0
2    2      
3    3      
4    4      
5    5      2
6    6      
7    7      
8    8      

> select * from test fill(previous);
name: test
time value1 value2
---- ------ ------
1    1      0
2    2      0
3    3      0
4    4      0
5    5      2
6    6      2
7    7      2
8    8      2

不幸的是,它没有对不是标签的键进行分组。不过,我需要对任何列进行分组:

> select mean(value1) from (select value1, value2 from test fill(previous)) group by value2;
name: test
tags: value2=
time mean
---- ----
0    4.5

我也试过TimescaleDB+PostreSQL,但是TimescaleDB的间隙填充功能很差。我使用自纪元以来的毫秒数作为我的真实数据中的时间戳,并且 TimescaleDB 填补了我的行之间的所有空白,即使我在特定时间间隔内没有记录任何内容也是如此。由于 TimescaleDB 的采样,具有 17k 行的 table 变成了具有 500k 行的 table。

编辑

因为我被要求提供更多关于我尝试使用 TimescaleDB 满足我的需求的信息;我关注了这个页面上的信息:

https://blog.timescale.com/blog/sql-functions-for-time-series-analysis/

locf 正在做我想做的事情。

这里的问题是,如果没有像avg这样的聚合函数,就不能使用locf。你必须像这里描述的那样传递非常不同的时间间隔:

https://docs.timescale.com/latest/api#locf

我正在分析阀门状态。它们可以是打开的或关闭的。为了节省上传时间,我只在阀门实际切换状态时上传数据。但我也想知道在另一个阀门打开时是否打开了一个阀门。因此我需要填补每一行中的所有空白,或者换句话说 "last observation carried forward".

我的一个测试数据集有 17000 行阀门开关。如果我将 1ms 设置为时间间隔(因为在每个 ms 中阀门可以开始切换其状态)以便使用 locfavg只是一个观察,我没有得到例如 0.5 的阀门状态。它可以打开或关闭。但是由于 time_bucket_gapfill() 函数,我的行数从 17000 扩展到 500000。

查询有点像这样:

SELECT time_bucket_gapfill
(
'1 millisecond',
ts::TIMESTAMP,
start => '2020-03-11 11:09:49',
finish => '2020-03-11 11:17:16'
) AS tsi,
LOCF(avg(airpressure1))
FROM (SELECT TO_TIMESTAMP(TIME::DOUBLE precision / 1000.0)::TIMESTAMP AS TS,
             airpressure1,
             airpressure2,
             airpressure3,
             airpressure4
      FROM ts_data
      WHERE process_id = 1
      ORDER BY TO_TIMESTAMP(TIME::DOUBLE precision / 1000.0)::TIMESTAMP DESC) AS T
GROUP BY tsi;

编辑 2

真实的例子是,没有打开安全阀之前不允许打开气压阀。随着空白的填补,它很容易被质疑。但是还有更多的用例。 table中也有类似压力的模拟数据,但为了简单起见,我们跳过它。

SELECT TIME,
       airpressure1 as air1,
       safetyvlv_out1 as safety1,
       airpressure2 as air2,
       safetyvlv_out2 as safety2,
       airpressure3 as air3,
       safetyvlv_out3 as safety3,
       airpressure4 as air4,
       safetyvlv_out4 as safety4
FROM ts_data_gaps
WHERE airpressure1 IS NOT NULL
OR    safetyvlv_out1 IS NOT NULL;

time    air1    safety1 air2    safety2 air3    safety3 air4    safety4
1583921837975       1.0     1.0     1.0     1.0
1583921844020       0.0     0.0     0.0     0.0
1583921868224       1.0                     
1583921878845   1.0                         
1583921878985   0.0                         
1583921879798   1.0                         
1583921879900   0.0                         
1583921880537       0.0                     
1583921915212       1.0     1.0     1.0     1.0
1583921919219   1.0     1.0     1.0     1.0 
1583921919488   0.0     0.0     0.0     0.0 
1583921923488   1.0     1.0     1.0     1.0 
1583921926491   0.0     0.0     0.0     0.0 
1583921930497   1.0     1.0     1.0     1.0 
1583921933501   0.0     0.0     0.0     0.0 
1583921934008       0.0     0.0     0.0     0.0
1583921389639       1.0     1.0     1.0     1.0
1583921395681       0.0     0.0     0.0     0.0
1583921415256       1.0                     
1583921425912   1.0                         
1583921426027   0.0                         
1583921426729   1.0                         
1583921426837   0.0                         
1583921427043   1.0                         
1583921427084   0.0                         
1583921427582       0.0     

当我在 17k 行上做这样的事情时:

create view z as
SELECT TIME,
       airpressure1 as air1,
       safetyvlv_out1 as safety1,
       airpressure2 as air2,
       safetyvlv_out2 as safety2,
       airpressure3 as air3,
       safetyvlv_out3 as safety3,
       airpressure4 as air4,
       safetyvlv_out4 as safety4
FROM ts_data_gaps
WHERE airpressure1 IS NOT NULL
OR    safetyvlv_out1 IS NOT NULL;

SELECT time_bucket_gapfill
(
'1 millisecond',
ts::TIMESTAMP,
start => '2020-03-11 03:41:08',
finish => '2020-03-11 11:18:54'
) AS tsi,
LOCF(avg(air1)) as air1,
LOCF(avg(safety1)) as safety1
FROM (SELECT TO_TIMESTAMP(TIME::DOUBLE precision / 1000.0)::TIMESTAMP AS TS,
             air1, safety1
      FROM z
      ORDER BY TO_TIMESTAMP(TIME::DOUBLE precision / 1000.0)::TIMESTAMP DESC) AS T
GROUP BY tsi;

我的 32 GB RAM PC 运行 内存不足!

看来您不需要填空。只是为了传播已经存在的时间戳的缺失值。 你可以获得

select * from test fill(previous);
name: test
time value1 value2
---- ------ ------
1    1      0
2    2      0
3    3      0
4    4      0
5    5      2
6    6      2
7    7      2
8    8      2

在 PostgreSQL(和 TimescaleDB)中

create view x as select time, value2 from test where value2 is not null;
select * from x;
time value2
---- ------
1    0
5    2
create view y as 
   select time t_start, lead(time, 1) over (order by time) t_end, value2 from x;
 select * from y;
t_start, t_end, value2
-------  -----  ------
1        5      0
5               2
select test.time, test.value1, y.value2
from test 
join y 
  on test.time >=y.t_start and (test.time<y.t_end or t_end is null);

我创建了一堆视图只是为了更清楚地显示步骤。 x 是一个视图,它只包含那些具有值的行,y 使用 window 函数来查找应该使用值的间隔 然后通过测试 table 和 y 的简单连接以您表达的格式给出结果,但我认为 y 中的格式在应用程序

中更有用

最后的最终解决方案是使用 Azure Cloud 和 Azure Data Explorer。这是我见过的最强大的工具(除了在 python 中编写一些分析程序之外)。它满足了我的所有需求,并且非常擅长分析文档、表格和时间序列。它使用 Kusto 作为查询语言,这也(与 MongoDB 查询语言相比)更加简单和直观。 如果您寻求开源和免费解决方案,显然这不是最佳解决方案。

作为更新,在 Timescale Toolkit 中,我们有一个带有 timevectors and function pipelines that does this sort of thing, it's called fill_to(). Docs are here: https://docs.timescale.com/timescaledb/latest/how-to-guides/hyperfunctions/function-pipelines/#compound-transforms 的实验函数(稍微向下滚动)

您可以使用它来填补空白而无需聚合,因为这张海报提到的是以前迭代的问题。