为 DELETE 和 INSERT 声明 return 值

Declare and return value for DELETE and INSERT

我正在尝试根据唯一 ID 从我们的某些数据库中删除重复数据。所有删除的数据都应存储在单独的 table 中以供审计。因为它涉及相当多的数据库和不同的模式和 tables 我想开始使用变量来减少错误的可能性和我需要的工作量。

这是我能想到的最佳示例查询,但它不起作用:

do $$
declare @source_schema  varchar := 'my_source_schema';
declare @source_table   varchar := 'my_source_table';
declare @target_table   varchar := 'my_target_schema' || source_table || '_duplicates'; --target schema and appendix are always the same, source_table is a variable input.
declare @unique_keys    varchar := ('1', '2', '3') 

begin 
select into @target_table
from @source_schema.@source_table
where id in (@unique_keys);

delete from @source_schema.@source_table where export_id in (@unique_keys);

end ;
$$;

查询语法适用于硬编码值。

大多数时候,我的变量被视为列或根本无法识别。 :(

您需要创建并调用带有输入参数的 plpgsql 过程:

CREATE OR REPLACE PROCEDURE duplicates_suppress
(my_target_schema text, my_source_schema text, my_source_table text, unique_keys text[])
LANGUAGE plpgsql AS
$$
BEGIN

EXECUTE FORMAT(
'WITH list AS (INSERT INTO %1$I.%3$I_duplicates SELECT * FROM %2$I.%3$I WHERE array[id] <@ %4$L :: integer[] RETURNING id)
DELETE FROM %2$I.%3$I AS t USING list AS l WHERE t.id = l.id', my_target_schema, my_source_schema, my_source_table, unique_keys :: text) ;

END ;
$$ ;

过程 duplicates_suppressmy_source_schema.my_source_table 中 ID 在数组 unique_keys 中的行插入 my_target_schema.my_source_table || '_duplicates',然后从 table 中删除这些行my_source_schema.my_source_table .

查看dbfiddle中的测试结果。

如评论所述,您需要某种动态 SQL。在 FUNCTIONPROCEDUREDO 语句中在服务器上执行此操作。

你应该对table和PL/pgSQL感到舒服。动态SQL不是初学者的玩具。

带有 PROCEDURE 的示例,就像 Edouard 已经建议的那样。您需要 FUNCTION 而不是将其包装在外部事务中(就像您很可能那样)。参见:

CREATE OR REPLACE PROCEDURE pg_temp.f_archive_dupes(_source_schema text, _source_table text, _unique_keys int[], OUT _row_count int)
  LANGUAGE plpgsql AS
$proc$
   -- target schema and appendix are always the same, source_table is a variable input
DECLARE
   _target_schema CONSTANT text := 's2';  -- hardcoded
   _target_table  text := _source_table || '_duplicates';
   _sql           text := format(
'WITH del AS (
   DELETE FROM %I.%I
   WHERE  id = ANY()
   RETURNING *
   )
INSERT INTO %I.%I TABLE del', _source_schema, _source_table
                            , _target_schema, _target_table);
BEGIN
   RAISE NOTICE '%', _sql;           -- debug
   EXECUTE _sql USING _unique_keys;  -- execute

   GET DIAGNOSTICS _row_count = ROW_COUNT;
END
$proc$;

致电:

CALL pg_temp.f_archive_dupes('s1', 't1', '{1, 3}', 0);

db<>fiddle here

我将程序设为 临时,因为我认为您不需要永久保留它。每个数据库创建一次。参见:

  • How to create a temporary function in PostgreSQL?

传递的架构和 table 名称是 区分大小写的 字符串! (与普通 SQL 中不带引号的标识符不同。)无论哪种方式,在动态连接 SQL 时都要小心 SQL 注入。参见:

  • Are PostgreSQL column names case-sensitive?
  • Table name as a PostgreSQL function parameter

_unique_keys 键入 int[]integer 的数组),因为您的示例值看起来像整数。使用 id 列的 实际数据类型

变量_sql保存查询字符串,因此在实际执行之前可以轻松调试。为此目的使用 RAISE NOTICE '%', _sql;
我建议在您确定之前评论 EXECUTE 行。

我将 PROCEDURE return 设置为已处理的行数。您没有要求这样做,但这通常很方便。几乎不花任何代价。参见:

  • Dynamic SQL (EXECUTE) as condition for IF statement
  • Best way to get result count before LIMIT was applied

最后但同样重要的是,在 数据修改 CTE 中使用 DELETE ... RETURNING *。因为它只需要一次找到行,它的成本 是单独的 SELECTDELETE 的一半 。而且绝对安全。如果出现任何问题,整个事务都会回滚。
两个单独的命令也可以 运行 解决以这种方式排除的并发问题或竞争条件,因为 DELETE 隐式锁定要删除的行。示例:

  • Replicating data between Postgres DBs

您可以在客户端程序中构建语句。像psql一样,使用\gexec。示例:

  • Filter column names from existing table for SQL DDL statement

根据欧文的回答,小幅优化...

create or replace procedure pg_temp.p_archive_dump
    (_source_schema text, _source_table text,
        _unique_key int[],_target_schema text)
language plpgsql as
    $$
    declare
        _row_count bigint;
        _target_table text := '';
    BEGIN
        select quote_ident(_source_table) ||'_'|| array_to_string(_unique_key,'_')   into _target_table from quote_ident(_source_table);
        raise notice 'the deleted table records will store in %.%',_target_schema, _target_table;
        execute format('create table %I.%I as select * from %I.%I limit 0',_target_schema, _target_table,_source_schema,_source_table );

        execute format('with mm as ( delete from %I.%I where id = any (%L) returning * ) insert into %I.%I table mm'

            ,_source_schema,_source_table,_unique_key, _target_schema, _target_table);
    GET DIAGNOSTICS _row_count = ROW_COUNT;
    RAISE notice 'rows influenced, %',_row_count;
    end
$$;

-- 如果您的 _unique_key 不是那么多,此解决方案还会为您创建一个 table。显然,您需要自己创建目标架构。
如果您的 unique_key 太多,您可以自定义以正确重命名转储的 table.
就叫它吧。
call pg_temp.p_archive_dump('s1','t1', '{1,2}','s2');
s1 是源架构,t1 是源 table,{1,2} 是您要提取到新 table 的唯一键。 s2 是目标模式