如何在函数内部强制执行 COMMIT,以便其他会话可以看到更新的行?

How to force COMMIT inside function so other sessions can see updated row?

在 Postgres 12 数据库中,我在一个函数中有多个查询(SELECTUPDATE、...),这些查询总共需要大约 20 分钟才能完成。 如果 status 不是 运行:

,我在顶部有一个检查 UPDATE
create or replace function aaa.fnc_work() returns varchar as 
$body$
    begin
        if (select count(*) from aaa.monitor where id='invoicing' and status='running')=0 then
           return 'running';
        else
           update aaa.monitor set status='running' where id='invoicing';
        end if;
        --- rest of code ---
        --finally
        update aaa.monitor set status='idle' where id='invoicing';
        return '';
    exception when others then
         return SQLERRM::varchar;
    end
$body$
language plpgsql;

这个想法是为了防止其他用户在 status 空闲之前执行 --- rest of code ---

但是,其他人(调用相同的函数)似乎看不到更新的状态,他们也继续并开始执行 --- rest of code ---。如何在以下时间后强制提交:

update aaa.monitor set status='running' where id='invoicing';

以便所有其他用户会话可以看到更新后的 status 并相应地退出。

我需要交易吗?

您要完成的是自治事务。 PostgreSQL 没有简单的方法来做到这一点。 link here 讨论了一些备选方案。

但是上面 linked 文章中讨论的一种方法是使用 PostgreSQL dblink 扩展。

您需要将扩展​​程序添加到服务器

CREATE EXTENSION dblink; 

然后您可以创建一个从您的函数中调用的新函数

CREATE FUNCTION update_monitor_via_dblink(msg text)
 RETURNS void
 LANGUAGE sql
AS $function$
   select dblink('host=/var/run/postgresql port=5432 user=postgres dbname=postgres',
    format(' update aaa.monitor set status= %M',msg::text)
$function$;

您可能要考虑的另一件事是使用 PostgreSQL 锁。可以找到更多信息 here

继续阅读。我把最好的留到最后。

概念验证 PROCEDURE

Postgres FUNCTION 始终是原子的(在单个事务包装器中运行)并且不能处理事务。所以 COMMIT 是不允许的。您 可以 使用 dblink 的技巧来解决这个问题。参见:

  • Does Postgres support nested or autonomous transactions?
  • How do I do large non-blocking updates in PostgreSQL?

但对于像这样的嵌套交易,请考虑使用 PROCEDURE。在 Postgres 11 中引入。您可以在那里管理交易:

CREATE OR REPLACE PROCEDURE aaa.proc_work(_id text, INOUT _result text = NULL)
  LANGUAGE plpgsql AS
$proc$
BEGIN
   -- optionally assert steering row exists
   PERFORM FROM aaa.monitor WHERE id = _id FOR KEY SHARE SKIP LOCKED;

   IF NOT FOUND THEN   
      RAISE EXCEPTION 'monitor.id = % not found or blocked!', quote_literal(_id);
   END IF;

   -- try UPDATE
   UPDATE aaa.monitor
   SET    status = 'running'
   WHERE  id = _id                   -- assuming valid _id
   AND    status <> 'running';       -- assuming "status" is NOT NULL

   IF NOT FOUND THEN
      _result := 'running'; RETURN;  -- this is how you return with INOUT params
   END IF;

   COMMIT;                           -- HERE !!!

   BEGIN                             -- start new code block

      ----- code for big work HERE -----
      -- PERFORM 1/0;                -- debug: test exception?
      -- PERFORM pg_sleep(5);        -- debug: test concurrency?

      _result := '';

   -- also catching QUERY_CANCELED and ASSERT_FAILURE
   -- is a radical step to try andrelease 'running' rows no matter what
   EXCEPTION WHEN OTHERS OR QUERY_CANCELED OR ASSERT_FAILURE THEN
      -- ROLLBACK;                   -- roll back (unfinished?) big work
      _result := SQLERRM;
   END;                              -- end of nested block

   UPDATE aaa.monitor                -- final reset
   SET    status = 'idle'
   WHERE  id = _id
   AND    status <> 'idle';          -- only if needed
END
$proc$;

致电(重要!):

CALL aaa.proc_work('invoicing');  -- stand-alone call!

重要提示

UPDATE之后添加COMMIT。之后,并发事务可以看到更新的行。

但是没有额外的BEGINSTART TRANSACTIONThe manual:

In procedures invoked by the CALL command as well as in anonymous code blocks (DO command), it is possible to end transactions using the commands COMMIT and ROLLBACK. A new transaction is started automatically after a transaction is ended using these commands, so there is no separate START TRANSACTION command. (Note that BEGIN and END have different meanings in PL/pgSQL.)

我们需要一个单独的 PL/pgSQL code block, because you have a custom exception handler, and (quoting the manual):

A transaction cannot be ended inside a block with exception handlers.

(但我们可以在 EXCEPTION 处理程序中 COMMIT / ROLLBACK。)

您不能在外部事务中调用此过程,也不能与任何其他 DML 语句一起调用,这会强制使用外部事务包装器。必须是独立的 CALL。参见:

注意最后的 UPDATE aaa.monitor SET status = 'idle' WHERE ...。否则(已提交!)status 将在异常后无限期地保持 'running'。

关于从过程返回值:

我在 INOUT 参数中添加了 DEFAULT NULL,因此您不必在调用时提供参数。

UPDATE 直接。如果该行是 'running',则不会发生更新。 (这也修复了逻辑:当找到 nostatus='running' 时,您的 IF 表达式似乎向后 returns 'running'。看来你想要相反的东西。)

我添加了一个(可选!)断言以确保 table aaa.monitor 中的行存在。添加 FOR KEY SHARE 锁也可以消除断言和后续 UPDATE 之间竞争条件的微小时间 window。锁定与删除或更新 PK 列冲突 - 但 与更新 status 冲突。所以在正常操作中永远不会引发异常! The manual:

Currently, the set of columns considered for the UPDATE case are those that have a unique index on them that can be used in a foreign key (so partial indexes and expressional indexes are not considered), but this may change in the future.

SKIP LOCK 在锁冲突的情况下不等待。添加的异常永远不应该发生。只是展示一个防水的概念证明。

您的更新显示 aaa.monitor 中有 25 行,因此我添加了参数 _id

更好的方法

以上内容可能有助于保存更多信息以供全世界查看。对于队列操作,有更有效 的解决方案。使用 lock 代替,这对其他人来说是“可见的”。然后你不需要嵌套事务开始,一个普通的 FUNCTION 就可以了:

CREATE OR REPLACE FUNCTION aaa.fnc_work(_id text)
  RETURNS text
  LANGUAGE plpgsql AS
$func$
BEGIN
   -- optionally assert that the steering row exists
   PERFORM FROM aaa.monitor WHERE id = _id FOR KEY SHARE SKIP LOCKED;
   IF NOT FOUND THEN   
      RAISE EXCEPTION 'monitor.id = % not found or blocked!', quote_literal(_id);
   END IF;

   -- lock row
   PERFORM FROM aaa.monitor WHERE id = _id FOR NO KEY UPDATE SKIP LOCKED;

   IF NOT FOUND THEN
      -- we made sure the row exists, so it must be locked
      RETURN 'running';
   END IF;

   ----- code for big work HERE -----
   -- PERFORM 1/0;                -- debug: test exception?
   -- PERFORM pg_sleep(5);        -- debug: test concurrency?

   RETURN '';

EXCEPTION WHEN OTHERS THEN
   RETURN SQLERRM;

END
$func$;

致电:

SELECT aaa.fnc_work('invoicing');

调用可以任意嵌套。只要一个事务在处理大作业,就不会启动其他事务。

同样,可选断言取出一个FOR KEY SHARE锁来消除竞争条件的时间window,添加的异常在正常操作中应该永远不会发生。

为此,我们根本不需要 status 列。行锁本身就是看门人。因此 PERFORM FROM aaa.monitor ... 中的空 SELECT 列表。附带好处:这也不会通过来回更新行来产生死元组。如果你因为其他原因还需要更新status,你又回到了上一章的可见性问题。你可以结合两者...

关于PERFORM

  • SELECT or PERFORM in a PL/pgSQL function

关于行锁: