在 Postgres 中存储范围时间序列数据
Storing ranged timeseries data in Postgres
我需要在 Postgresql 中存储 netflow 数据。这是有关网络流量的数据。每条记录包含以下内容:
- 连接开始时间
- 连接结束时间
- 传输的总数据
- Source/destination IPs/ASNs
- (还有很多,但是对于这个问题的目的来说已经足够了)。
我的问题是:如何存储这些数据,以便有效地计算过去 X days/hours 的数据传输率?例如,我可能想绘制一张图表,显示过去 7 天内 Netflix 的 ASN 的所有流量,以小时为单位。
连接开始和结束时间之间的差异可能是几毫秒,也可能是一个多小时。
我的第一步是将连接存储在带有 GiST 索引的 TSTZRANGE 字段中。然后查询最近7天每小时的流量数据:
- 使用 CTE 生成一系列每小时时间段
- 查找与每个桶重叠的任何 TSTZRANGE
- 计算重叠的持续时间
- 计算记录的数据速率(以字节/秒为单位)
- 做 duration * bytes per second 以获得总数据
- 将其全部分组到桶中,对总数据值求和
但是,这听起来像是一项繁重的工作。谁能想到更好的选择?
初稿:
WITH ts_bucket AS (
SELECT
LAG(gs, 1) OVER () AS begin_period,
gs AS end_period
FROM
generate_series('1/25/2021 0:00-8'::timestamptz, '1/26/2021 0:00-8'::timestamptz, '1 hour') AS gs
),
se AS (
SELECT
1000000 AS bytes,
'01/25/2021 11:35-8'::timestamptz AS start_ts,
'01/25/2021 12:45-08'::timestamptz AS end_ts
)
SELECT
*,
extract('epoch' FROM (upper(tstzrange(begin_period, end_period, '[]') * tstzrange(start_ts, end_ts, '[]'))) - (lower(tstzrange(begin_period, end_period, '[]') * tstzrange(start_ts, end_ts, '[]')))) * bytes / extract('epoch' FROM end_ts - start_ts) AS data_transferred
FROM
ts_bucket,
se
WHERE
begin_period IS NOT NULL
AND tstzrange(se.start_ts, se.end_ts, '[]') && tstzrange(ts_bucket.begin_period, ts_bucket.end_period, '[]');
begin_period | end_period | bytes | start_ts | end_ts | data_transferred
------------------------+------------------------+---------+------------------------+------------------------+--------------------
2021-01-25 11:00:00-08 | 2021-01-25 12:00:00-08 | 1000000 | 2021-01-25 11:35:00-08 | 2021-01-25 12:45:00-08 | 357142.85714285716
2021-01-25 12:00:00-08 | 2021-01-25 13:00:00-08 | 1000000 | 2021-01-25 11:35:00-08 | 2021-01-25 12:45:00-08 | 642857.1428571428
这是基于将连接开始时间和结束时间存储在单独的字段中,然后根据需要将它们转换为范围。
在进一步研究之后,我认为真正的答案是没有一种开箱即用的方法可以高效地实现这一点。特别是随着数据量的增加。最终聚合数千行的速度会很慢,因为那只是大量的数据访问。
相反,我走了一条不同的路。我在存储原始流 (traffic_flow
) 的 table 上使用 Postgresql 触发器。每次将记录插入 traffic_flow
时,触发器会将新数据更新到单独的聚合 table 中,用于每日、每小时和分钟数据。
这是我的实验性实现,以防对某人有用。这可以改进以处理更新和删除。
create or replace function update_aggregated_traffic(NEW RECORD, table_name TEXT, interval_name text, store_customer BOOLEAN)
returns void
language plpgsql
as
$body$
declare
aggregate_interval interval;
customer_ip_ inet;
begin
-- Update the data aggregated traffic data given the insertion of a new flow.
-- A flow is the data about a single connection (start time, stop time, total
-- bytes/packets). This function essentially rasterises that data into a
-- series of aggregation buckets.
-- interval_name should be second, hour, or minute
-- turn the interval_name into an actual INTERVAL
aggregate_interval = ('1 ' || interval_name)::INTERVAL;
if store_customer then
customer_ip_ = NEW.source_address;
else
customer_ip_ = '100.64.0.0'::INET;
end if;
-- We need to insert into a dynamically generated table name. There is
-- no way to do this without writing the whole SQL statement as a string.
-- Instead, let's use a trick. Create a temporary view, then insert into that.
-- Postgres will proxy this insert into the desired table
drop view if exists table_pointer;
execute format('create temporary view table_pointer as select * from %s', table_name);
-- We use a CTE to keep things readable, even though it is pretty long
with aggregate_range AS (
-- Create all the aggregate buckets spanned by the inserted flow
SELECT generate_series(
date_trunc(interval_name, lower(NEW.range)),
date_trunc(interval_name, upper(NEW.range)),
aggregate_interval
) as range_lower
),
-- For each bucket, figure out its overlap with the provided flow data.
-- Only the first and last buckets will have less than than complete overlap,
-- but we do the calculation for all buckets anyway
with_overlaps AS (
SELECT
NEW.range * tstzrange(range_lower, range_lower + aggregate_interval) AS overlap,
range_lower
FROM
aggregate_range
),
-- Convert the overlap intervals into seconds (FLOAT)
with_overlap_seconds AS (
SELECT
extract(epoch from (upper(overlap) - lower(overlap))) as overlap_seconds,
range_lower
FROM
with_overlaps
)
-- Now we have enough information to do the inserts
insert into table_pointer as traffic
(timestamp, customer_ip, as_number, bytes, packets)
select
range_lower,
customer_ip_,
NEW.as_number,
-- Scale the packets/bytes per second to be a total number of
-- of packets/bytes
round(NEW.bytes_per_second * overlap_seconds)::INT,
round(NEW.packets_per_second * overlap_seconds)::INT
from with_overlap_seconds
-- We shouldn't have any 0-second overlaps, but let's just be sure
where overlap_seconds > 0
-- If there is already existing data, then increment the bytes/packets values
on conflict (customer_ip, timestamp, as_number) DO UPDATE SET
bytes = EXCLUDED.bytes + traffic.bytes,
packets = EXCLUDED.packets + traffic.packets
;
end;
$body$;
create or replace function update_aggregated_traffic_hourly() returns trigger
language plpgsql
as
$body$
begin
-- Store aggregated data for different resolutions. For each we also store data
-- without the customer information. This way we can efficiently see traffic data
-- for the whole network
PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', False);
PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', False);
PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', False);
PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', False);
return NEW;
end;
$body$;
create trigger update_aggregated_traffic_hourly_trigger AFTER INSERT ON traffic_flow
FOR EACH ROW EXECUTE PROCEDURE update_aggregated_traffic_hourly();
我需要在 Postgresql 中存储 netflow 数据。这是有关网络流量的数据。每条记录包含以下内容:
- 连接开始时间
- 连接结束时间
- 传输的总数据
- Source/destination IPs/ASNs
- (还有很多,但是对于这个问题的目的来说已经足够了)。
我的问题是:如何存储这些数据,以便有效地计算过去 X days/hours 的数据传输率?例如,我可能想绘制一张图表,显示过去 7 天内 Netflix 的 ASN 的所有流量,以小时为单位。
连接开始和结束时间之间的差异可能是几毫秒,也可能是一个多小时。
我的第一步是将连接存储在带有 GiST 索引的 TSTZRANGE 字段中。然后查询最近7天每小时的流量数据:
- 使用 CTE 生成一系列每小时时间段
- 查找与每个桶重叠的任何 TSTZRANGE
- 计算重叠的持续时间
- 计算记录的数据速率(以字节/秒为单位)
- 做 duration * bytes per second 以获得总数据
- 将其全部分组到桶中,对总数据值求和
但是,这听起来像是一项繁重的工作。谁能想到更好的选择?
初稿:
WITH ts_bucket AS (
SELECT
LAG(gs, 1) OVER () AS begin_period,
gs AS end_period
FROM
generate_series('1/25/2021 0:00-8'::timestamptz, '1/26/2021 0:00-8'::timestamptz, '1 hour') AS gs
),
se AS (
SELECT
1000000 AS bytes,
'01/25/2021 11:35-8'::timestamptz AS start_ts,
'01/25/2021 12:45-08'::timestamptz AS end_ts
)
SELECT
*,
extract('epoch' FROM (upper(tstzrange(begin_period, end_period, '[]') * tstzrange(start_ts, end_ts, '[]'))) - (lower(tstzrange(begin_period, end_period, '[]') * tstzrange(start_ts, end_ts, '[]')))) * bytes / extract('epoch' FROM end_ts - start_ts) AS data_transferred
FROM
ts_bucket,
se
WHERE
begin_period IS NOT NULL
AND tstzrange(se.start_ts, se.end_ts, '[]') && tstzrange(ts_bucket.begin_period, ts_bucket.end_period, '[]');
begin_period | end_period | bytes | start_ts | end_ts | data_transferred
------------------------+------------------------+---------+------------------------+------------------------+--------------------
2021-01-25 11:00:00-08 | 2021-01-25 12:00:00-08 | 1000000 | 2021-01-25 11:35:00-08 | 2021-01-25 12:45:00-08 | 357142.85714285716
2021-01-25 12:00:00-08 | 2021-01-25 13:00:00-08 | 1000000 | 2021-01-25 11:35:00-08 | 2021-01-25 12:45:00-08 | 642857.1428571428
这是基于将连接开始时间和结束时间存储在单独的字段中,然后根据需要将它们转换为范围。
在进一步研究之后,我认为真正的答案是没有一种开箱即用的方法可以高效地实现这一点。特别是随着数据量的增加。最终聚合数千行的速度会很慢,因为那只是大量的数据访问。
相反,我走了一条不同的路。我在存储原始流 (traffic_flow
) 的 table 上使用 Postgresql 触发器。每次将记录插入 traffic_flow
时,触发器会将新数据更新到单独的聚合 table 中,用于每日、每小时和分钟数据。
这是我的实验性实现,以防对某人有用。这可以改进以处理更新和删除。
create or replace function update_aggregated_traffic(NEW RECORD, table_name TEXT, interval_name text, store_customer BOOLEAN)
returns void
language plpgsql
as
$body$
declare
aggregate_interval interval;
customer_ip_ inet;
begin
-- Update the data aggregated traffic data given the insertion of a new flow.
-- A flow is the data about a single connection (start time, stop time, total
-- bytes/packets). This function essentially rasterises that data into a
-- series of aggregation buckets.
-- interval_name should be second, hour, or minute
-- turn the interval_name into an actual INTERVAL
aggregate_interval = ('1 ' || interval_name)::INTERVAL;
if store_customer then
customer_ip_ = NEW.source_address;
else
customer_ip_ = '100.64.0.0'::INET;
end if;
-- We need to insert into a dynamically generated table name. There is
-- no way to do this without writing the whole SQL statement as a string.
-- Instead, let's use a trick. Create a temporary view, then insert into that.
-- Postgres will proxy this insert into the desired table
drop view if exists table_pointer;
execute format('create temporary view table_pointer as select * from %s', table_name);
-- We use a CTE to keep things readable, even though it is pretty long
with aggregate_range AS (
-- Create all the aggregate buckets spanned by the inserted flow
SELECT generate_series(
date_trunc(interval_name, lower(NEW.range)),
date_trunc(interval_name, upper(NEW.range)),
aggregate_interval
) as range_lower
),
-- For each bucket, figure out its overlap with the provided flow data.
-- Only the first and last buckets will have less than than complete overlap,
-- but we do the calculation for all buckets anyway
with_overlaps AS (
SELECT
NEW.range * tstzrange(range_lower, range_lower + aggregate_interval) AS overlap,
range_lower
FROM
aggregate_range
),
-- Convert the overlap intervals into seconds (FLOAT)
with_overlap_seconds AS (
SELECT
extract(epoch from (upper(overlap) - lower(overlap))) as overlap_seconds,
range_lower
FROM
with_overlaps
)
-- Now we have enough information to do the inserts
insert into table_pointer as traffic
(timestamp, customer_ip, as_number, bytes, packets)
select
range_lower,
customer_ip_,
NEW.as_number,
-- Scale the packets/bytes per second to be a total number of
-- of packets/bytes
round(NEW.bytes_per_second * overlap_seconds)::INT,
round(NEW.packets_per_second * overlap_seconds)::INT
from with_overlap_seconds
-- We shouldn't have any 0-second overlaps, but let's just be sure
where overlap_seconds > 0
-- If there is already existing data, then increment the bytes/packets values
on conflict (customer_ip, timestamp, as_number) DO UPDATE SET
bytes = EXCLUDED.bytes + traffic.bytes,
packets = EXCLUDED.packets + traffic.packets
;
end;
$body$;
create or replace function update_aggregated_traffic_hourly() returns trigger
language plpgsql
as
$body$
begin
-- Store aggregated data for different resolutions. For each we also store data
-- without the customer information. This way we can efficiently see traffic data
-- for the whole network
PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', False);
PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', False);
PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', False);
PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', False);
return NEW;
end;
$body$;
create trigger update_aggregated_traffic_hourly_trigger AFTER INSERT ON traffic_flow
FOR EACH ROW EXECUTE PROCEDURE update_aggregated_traffic_hourly();