以高效的方式维护和更新 SQL 的大型子集
Maintaining and updating large subsets of SQL results in an efficient manner
在我们的网络堆栈中,我正在努力实现一项功能,允许用户基本上指定几个过滤条件(现在称为 list),这些条件是从tables 在 Postgres 数据库中。用户可以拥有许多这样的列表,每个列表的一组列都具有 "contains" 或 "equal to" 等条件。
其中涉及的逻辑相对简单,但问题开始是因为客户希望能够每天查看 changes/updates 列表查询结果(因此本质上是存储每天的增量快照)并且其中一些过滤条件可能很慢,并且 运行 非索引列上的大 tables(所讨论的 tables 每个有 2-3 百万行)。
目前我们使用 Redis 和 Postgres 作为我们的存储后端,我不完全确定什么是最好的方式来表示甚至管理这些日常更新,以及为每个用户的每个列表索引它们。
- 我看到的最大问题是确定昨天(最后运行)和今天(当前运行)查询返回的结果与之前存储结果集之间的差异我怀疑每个列表都是低效的。如果没有它,我不知道我们如何弄清楚两个查询之间发生了什么变化(这些变化可能发生在系统中的许多地方,因此捕获所有这些变化并在应用程序代码中处理它们可能是一项大量的工作) .
- 如果已知结果集之间的差异(需要知道每个字段及其新值),相关 table 将填充更改列表
我确信很多处理分析数据的软件都解决了类似的问题,但我不太熟悉如何有效地解决这个问题,我不想重新发明轮子,所以我想问问有没有人ideas/suggestions知道如何实现它(可能使用额外的软件以及PG和Redis)?
详细来说,必须每 12 小时(当前)对所有现有列表批量执行此操作,最有可能使用调用更新程序的守护程序(或只是一个 cronjob)。
(抱歉,如果这个问题看起来含糊不清,我试图概述我能想到的每一个可能的方面,但我不确定我是否做得足够好)
我们所做的是使用触发器将所有更改存储在审计 table 中,将 table 的键存储为文本数组,并将更改的值(旧值和新值)存储为两个 jsonb字段:
CREATE OR REPLACE FUNCTION text2intsafe(text) RETURNS int AS $$
SELECT CASE WHEN ~ '^ *-?\d{1,9} *$' THEN ::int END
$$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT;
CREATE TABLE IF NOT EXISTS crm.audit (
audit_id bigserial NOT NULL PRIMARY KEY,
date timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
username text NOT NULL DEFAULT crm.f_user(),
tableclass regclass NOT NULL,
action text NOT NULL CHECK (action IN ('I','U','D')),
id text[] NOT NULL,
old_data jsonb NULL,
new_data jsonb NULL
);
CREATE INDEX ON audit (date);
CREATE INDEX ON audit (tableclass, id);
CREATE INDEX ON audit USING gin (old_data);
CREATE INDEX audit_tableclass_id_idx ON audit (tableclass, text2intsafe(id[1]));
CREATE OR REPLACE FUNCTION f_audit_get_value(p_tableclass regclass, p_date timestamptz, p_id text[], p_object text[]) RETURNS jsonb AS $$
SELECT COALESCE(
(SELECT a.old_data#>
FROM audit a
WHERE a.tableclass = AND a.date >= AND id = AND (a.old_data#>) IS NOT NULL
ORDER BY a.date LIMIT 1),
(SELECT a.new_data#>
FROM audit a
WHERE a.tableclass = AND a.date < AND id = AND (a.new_data#>) IS NOT NULL
ORDER BY a.date DESC LIMIT 1))
$$ LANGUAGE SQL SECURITY DEFINER STABLE STRICT;
CREATE OR REPLACE FUNCTION f_audit_get_text(p_tableclass regclass, p_date timestamptz, p_id text[], p_object text[]) RETURNS text AS $$
SELECT COALESCE(
(SELECT a.old_data#>>
FROM audit a
WHERE a.tableclass = AND a.date >= AND id = AND (a.old_data#>) IS NOT NULL
ORDER BY a.date LIMIT 1),
(SELECT a.new_data#>>
FROM audit a
WHERE a.tableclass = AND a.date < AND id = AND (a.new_data#>) IS NOT NULL
ORDER BY a.date DESC LIMIT 1))
$$ LANGUAGE SQL SECURITY DEFINER STABLE STRICT;
CREATE OR REPLACE FUNCTION f_jsonb_diff(jsonb, jsonb) RETURNS jsonb AS $$
-- return an object with attributes from that differ from those of
SELECT json_object_agg(v2.key, CASE COALESCE(t2, t1)
WHEN 'object' THEN public.f_json_diff(CASE WHEN t1 IS NULL THEN '{}'::jsonb ELSE v1.value END, CASE WHEN t2 IS NULL THEN '{}'::jsonb ELSE v2.value END)
WHEN 'array' THEN public.f_json_array_diff(CASE WHEN t1 = 'array' THEN v1.value ELSE '[]'::jsonb END, CASE WHEN t2 = 'array' THEN v2.value ELSE '[]'::jsonb END)
ELSE v2.value END)::jsonb
FROM jsonb_each(COALESCE(CASE WHEN jsonb_typeof() = 'object' THEN END, '{}'::jsonb)) v2
LEFT JOIN jsonb_each(COALESCE(CASE WHEN jsonb_typeof() = 'object' THEN END, '{}'::jsonb)) v1 on (v1.key = v2.key)
LEFT JOIN LATERAL NULLIF(jsonb_typeof(v1.value), 'null') t1 ON (true)
LEFT JOIN LATERAL NULLIF(jsonb_typeof(v2.value), 'null') t2 ON (true)
WHERE v1.value IS DISTINCT FROM v2.value
$$ LANGUAGE sql IMMUTABLE;
CREATE OR REPLACE FUNCTION tf_audit() RETURNS TRIGGER AS $$
DECLARE
_id text[];
_key int2vector;
_orow jsonb;
_nrow jsonb;
_odata jsonb;
_ndata jsonb;
BEGIN
_key := indkey FROM pg_index WHERE indrelid = TG_RELID AND indisunique ORDER BY indisprimary DESC LIMIT 1;
IF TG_OP = 'INSERT' THEN
_ndata := to_jsonb(NEW);
_id := ARRAY(SELECT _ndata->>attname FROM pg_attribute WHERE attrelid = TG_RELID AND attnum = ANY(_key) ORDER BY attnum);
ELSIF TG_OP = 'UPDATE' THEN
_nrow := to_jsonb(NEW);
_orow := to_jsonb(OLD);
_odata := f_jsonb_diff(_nrow, _orow);
_ndata := f_jsonb_diff(_orow, _nrow);
_id := ARRAY(SELECT _nrow->>attname FROM pg_attribute WHERE attrelid = TG_RELID AND attnum = ANY(_key) ORDER BY attnum);
ELSIF TG_OP = 'DELETE' THEN
_odata := to_jsonb(OLD);
_id := ARRAY(SELECT _odata->>attname FROM pg_attribute WHERE attrelid = TG_RELID AND attnum = ANY(_key) ORDER BY attnum);
END IF;
IF _odata <> '{}'::jsonb OR _ndata <> '{}'::jsonb THEN
INSERT INTO audit (username, tableclass, action, id, old_data, new_data)
VALUES (crm.f_user(), TG_RELID, substring(TG_OP FROM 1 FOR 1), _id, _odata, _ndata);
END IF;
RETURN CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
然后对于您要审核的每个 table,只需添加:
CREATE TRIGGER t_table_audit BEFORE UPDATE OR DELETE ON table
FOR EACH ROW EXECUTE PROCEDURE crm.tf_audit();
然后,您可以为任何已审核的table(从您实施审核之日起)的任何两个时间点之间生成准确的增量。
在我们的网络堆栈中,我正在努力实现一项功能,允许用户基本上指定几个过滤条件(现在称为 list),这些条件是从tables 在 Postgres 数据库中。用户可以拥有许多这样的列表,每个列表的一组列都具有 "contains" 或 "equal to" 等条件。
其中涉及的逻辑相对简单,但问题开始是因为客户希望能够每天查看 changes/updates 列表查询结果(因此本质上是存储每天的增量快照)并且其中一些过滤条件可能很慢,并且 运行 非索引列上的大 tables(所讨论的 tables 每个有 2-3 百万行)。
目前我们使用 Redis 和 Postgres 作为我们的存储后端,我不完全确定什么是最好的方式来表示甚至管理这些日常更新,以及为每个用户的每个列表索引它们。
- 我看到的最大问题是确定昨天(最后运行)和今天(当前运行)查询返回的结果与之前存储结果集之间的差异我怀疑每个列表都是低效的。如果没有它,我不知道我们如何弄清楚两个查询之间发生了什么变化(这些变化可能发生在系统中的许多地方,因此捕获所有这些变化并在应用程序代码中处理它们可能是一项大量的工作) .
- 如果已知结果集之间的差异(需要知道每个字段及其新值),相关 table 将填充更改列表
我确信很多处理分析数据的软件都解决了类似的问题,但我不太熟悉如何有效地解决这个问题,我不想重新发明轮子,所以我想问问有没有人ideas/suggestions知道如何实现它(可能使用额外的软件以及PG和Redis)?
详细来说,必须每 12 小时(当前)对所有现有列表批量执行此操作,最有可能使用调用更新程序的守护程序(或只是一个 cronjob)。
(抱歉,如果这个问题看起来含糊不清,我试图概述我能想到的每一个可能的方面,但我不确定我是否做得足够好)
我们所做的是使用触发器将所有更改存储在审计 table 中,将 table 的键存储为文本数组,并将更改的值(旧值和新值)存储为两个 jsonb字段:
CREATE OR REPLACE FUNCTION text2intsafe(text) RETURNS int AS $$
SELECT CASE WHEN ~ '^ *-?\d{1,9} *$' THEN ::int END
$$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT;
CREATE TABLE IF NOT EXISTS crm.audit (
audit_id bigserial NOT NULL PRIMARY KEY,
date timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
username text NOT NULL DEFAULT crm.f_user(),
tableclass regclass NOT NULL,
action text NOT NULL CHECK (action IN ('I','U','D')),
id text[] NOT NULL,
old_data jsonb NULL,
new_data jsonb NULL
);
CREATE INDEX ON audit (date);
CREATE INDEX ON audit (tableclass, id);
CREATE INDEX ON audit USING gin (old_data);
CREATE INDEX audit_tableclass_id_idx ON audit (tableclass, text2intsafe(id[1]));
CREATE OR REPLACE FUNCTION f_audit_get_value(p_tableclass regclass, p_date timestamptz, p_id text[], p_object text[]) RETURNS jsonb AS $$
SELECT COALESCE(
(SELECT a.old_data#>
FROM audit a
WHERE a.tableclass = AND a.date >= AND id = AND (a.old_data#>) IS NOT NULL
ORDER BY a.date LIMIT 1),
(SELECT a.new_data#>
FROM audit a
WHERE a.tableclass = AND a.date < AND id = AND (a.new_data#>) IS NOT NULL
ORDER BY a.date DESC LIMIT 1))
$$ LANGUAGE SQL SECURITY DEFINER STABLE STRICT;
CREATE OR REPLACE FUNCTION f_audit_get_text(p_tableclass regclass, p_date timestamptz, p_id text[], p_object text[]) RETURNS text AS $$
SELECT COALESCE(
(SELECT a.old_data#>>
FROM audit a
WHERE a.tableclass = AND a.date >= AND id = AND (a.old_data#>) IS NOT NULL
ORDER BY a.date LIMIT 1),
(SELECT a.new_data#>>
FROM audit a
WHERE a.tableclass = AND a.date < AND id = AND (a.new_data#>) IS NOT NULL
ORDER BY a.date DESC LIMIT 1))
$$ LANGUAGE SQL SECURITY DEFINER STABLE STRICT;
CREATE OR REPLACE FUNCTION f_jsonb_diff(jsonb, jsonb) RETURNS jsonb AS $$
-- return an object with attributes from that differ from those of
SELECT json_object_agg(v2.key, CASE COALESCE(t2, t1)
WHEN 'object' THEN public.f_json_diff(CASE WHEN t1 IS NULL THEN '{}'::jsonb ELSE v1.value END, CASE WHEN t2 IS NULL THEN '{}'::jsonb ELSE v2.value END)
WHEN 'array' THEN public.f_json_array_diff(CASE WHEN t1 = 'array' THEN v1.value ELSE '[]'::jsonb END, CASE WHEN t2 = 'array' THEN v2.value ELSE '[]'::jsonb END)
ELSE v2.value END)::jsonb
FROM jsonb_each(COALESCE(CASE WHEN jsonb_typeof() = 'object' THEN END, '{}'::jsonb)) v2
LEFT JOIN jsonb_each(COALESCE(CASE WHEN jsonb_typeof() = 'object' THEN END, '{}'::jsonb)) v1 on (v1.key = v2.key)
LEFT JOIN LATERAL NULLIF(jsonb_typeof(v1.value), 'null') t1 ON (true)
LEFT JOIN LATERAL NULLIF(jsonb_typeof(v2.value), 'null') t2 ON (true)
WHERE v1.value IS DISTINCT FROM v2.value
$$ LANGUAGE sql IMMUTABLE;
CREATE OR REPLACE FUNCTION tf_audit() RETURNS TRIGGER AS $$
DECLARE
_id text[];
_key int2vector;
_orow jsonb;
_nrow jsonb;
_odata jsonb;
_ndata jsonb;
BEGIN
_key := indkey FROM pg_index WHERE indrelid = TG_RELID AND indisunique ORDER BY indisprimary DESC LIMIT 1;
IF TG_OP = 'INSERT' THEN
_ndata := to_jsonb(NEW);
_id := ARRAY(SELECT _ndata->>attname FROM pg_attribute WHERE attrelid = TG_RELID AND attnum = ANY(_key) ORDER BY attnum);
ELSIF TG_OP = 'UPDATE' THEN
_nrow := to_jsonb(NEW);
_orow := to_jsonb(OLD);
_odata := f_jsonb_diff(_nrow, _orow);
_ndata := f_jsonb_diff(_orow, _nrow);
_id := ARRAY(SELECT _nrow->>attname FROM pg_attribute WHERE attrelid = TG_RELID AND attnum = ANY(_key) ORDER BY attnum);
ELSIF TG_OP = 'DELETE' THEN
_odata := to_jsonb(OLD);
_id := ARRAY(SELECT _odata->>attname FROM pg_attribute WHERE attrelid = TG_RELID AND attnum = ANY(_key) ORDER BY attnum);
END IF;
IF _odata <> '{}'::jsonb OR _ndata <> '{}'::jsonb THEN
INSERT INTO audit (username, tableclass, action, id, old_data, new_data)
VALUES (crm.f_user(), TG_RELID, substring(TG_OP FROM 1 FOR 1), _id, _odata, _ndata);
END IF;
RETURN CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
然后对于您要审核的每个 table,只需添加:
CREATE TRIGGER t_table_audit BEFORE UPDATE OR DELETE ON table
FOR EACH ROW EXECUTE PROCEDURE crm.tf_audit();
然后,您可以为任何已审核的table(从您实施审核之日起)的任何两个时间点之间生成准确的增量。