如何在函数内部强制执行 COMMIT,以便其他会话可以看到更新的行?
How to force COMMIT inside function so other sessions can see updated row?
在 Postgres 12 数据库中,我在一个函数中有多个查询(SELECT
、UPDATE
、...),这些查询总共需要大约 20 分钟才能完成。
如果 status
不是 运行:
,我在顶部有一个检查 UPDATE
create or replace function aaa.fnc_work() returns varchar as
$body$
begin
if (select count(*) from aaa.monitor where id='invoicing' and status='running')=0 then
return 'running';
else
update aaa.monitor set status='running' where id='invoicing';
end if;
--- rest of code ---
--finally
update aaa.monitor set status='idle' where id='invoicing';
return '';
exception when others then
return SQLERRM::varchar;
end
$body$
language plpgsql;
这个想法是为了防止其他用户在 status
空闲之前执行 --- rest of code ---
。
但是,其他人(调用相同的函数)似乎看不到更新的状态,他们也继续并开始执行 --- rest of code ---
。如何在以下时间后强制提交:
update aaa.monitor set status='running' where id='invoicing'
;
以便所有其他用户会话可以看到更新后的 status
并相应地退出。
我需要交易吗?
您要完成的是自治事务。 PostgreSQL 没有简单的方法来做到这一点。 link here 讨论了一些备选方案。
但是上面 linked 文章中讨论的一种方法是使用 PostgreSQL dblink 扩展。
您需要将扩展程序添加到服务器
CREATE EXTENSION dblink;
然后您可以创建一个从您的函数中调用的新函数
CREATE FUNCTION update_monitor_via_dblink(msg text)
RETURNS void
LANGUAGE sql
AS $function$
select dblink('host=/var/run/postgresql port=5432 user=postgres dbname=postgres',
format(' update aaa.monitor set status= %M',msg::text)
$function$;
您可能要考虑的另一件事是使用 PostgreSQL 锁。可以找到更多信息 here。
继续阅读。我把最好的留到最后。
概念验证 PROCEDURE
Postgres FUNCTION
始终是原子的(在单个事务包装器中运行)并且不能处理事务。所以 COMMIT
是不允许的。您 可以 使用 dblink
的技巧来解决这个问题。参见:
- Does Postgres support nested or autonomous transactions?
- How do I do large non-blocking updates in PostgreSQL?
但对于像这样的嵌套交易,请考虑使用 PROCEDURE
。在 Postgres 11 中引入。您可以在那里管理交易:
CREATE OR REPLACE PROCEDURE aaa.proc_work(_id text, INOUT _result text = NULL)
LANGUAGE plpgsql AS
$proc$
BEGIN
-- optionally assert steering row exists
PERFORM FROM aaa.monitor WHERE id = _id FOR KEY SHARE SKIP LOCKED;
IF NOT FOUND THEN
RAISE EXCEPTION 'monitor.id = % not found or blocked!', quote_literal(_id);
END IF;
-- try UPDATE
UPDATE aaa.monitor
SET status = 'running'
WHERE id = _id -- assuming valid _id
AND status <> 'running'; -- assuming "status" is NOT NULL
IF NOT FOUND THEN
_result := 'running'; RETURN; -- this is how you return with INOUT params
END IF;
COMMIT; -- HERE !!!
BEGIN -- start new code block
----- code for big work HERE -----
-- PERFORM 1/0; -- debug: test exception?
-- PERFORM pg_sleep(5); -- debug: test concurrency?
_result := '';
-- also catching QUERY_CANCELED and ASSERT_FAILURE
-- is a radical step to try andrelease 'running' rows no matter what
EXCEPTION WHEN OTHERS OR QUERY_CANCELED OR ASSERT_FAILURE THEN
-- ROLLBACK; -- roll back (unfinished?) big work
_result := SQLERRM;
END; -- end of nested block
UPDATE aaa.monitor -- final reset
SET status = 'idle'
WHERE id = _id
AND status <> 'idle'; -- only if needed
END
$proc$;
致电(重要!):
CALL aaa.proc_work('invoicing'); -- stand-alone call!
重要提示
在UPDATE
之后添加COMMIT
。之后,并发事务可以看到更新的行。
但是没有额外的BEGIN
或START TRANSACTION
。 The manual:
In procedures invoked by the CALL
command as well as in anonymous code
blocks (DO
command), it is possible to end transactions using the
commands COMMIT
and ROLLBACK
. A new transaction is started
automatically after a transaction is ended using these commands, so
there is no separate START TRANSACTION
command. (Note that BEGIN
and
END
have different meanings in PL/pgSQL.)
我们需要一个单独的 PL/pgSQL code block, because you have a custom exception handler, and (quoting the manual):
A transaction cannot be ended inside a block with exception handlers.
(但我们可以在 EXCEPTION
处理程序中 COMMIT
/ ROLLBACK
。)
您不能在外部事务中调用此过程,也不能与任何其他 DML 语句一起调用,这会强制使用外部事务包装器。必须是独立的 CALL
。参见:
注意最后的 UPDATE aaa.monitor SET status = 'idle' WHERE ...
。否则(已提交!)status
将在异常后无限期地保持 'running'。
关于从过程返回值:
我在 INOUT
参数中添加了 DEFAULT NULL
,因此您不必在调用时提供参数。
UPDATE
直接。如果该行是 'running',则不会发生更新。 (这也修复了逻辑:当找到 no 行 status='running'
时,您的 IF
表达式似乎向后 returns 'running'。看来你想要相反的东西。)
我添加了一个(可选!)断言以确保 table aaa.monitor
中的行存在。添加 FOR KEY SHARE
锁也可以消除断言和后续 UPDATE
之间竞争条件的微小时间 window。锁定与删除或更新 PK 列冲突 - 但 不 与更新 status
冲突。所以在正常操作中永远不会引发异常! The manual:
Currently, the set of columns considered for the UPDATE
case are
those that have a unique index on them that can be used in a foreign
key (so partial indexes and expressional indexes are not considered),
but this may change in the future.
SKIP LOCK
在锁冲突的情况下不等待。添加的异常永远不应该发生。只是展示一个防水的概念证明。
您的更新显示 aaa.monitor
中有 25 行,因此我添加了参数 _id
。
更好的方法
以上内容可能有助于保存更多信息以供全世界查看。对于队列操作,有更有效 的解决方案。使用 lock 代替,这对其他人来说是“可见的”。然后你不需要嵌套事务开始,一个普通的 FUNCTION
就可以了:
CREATE OR REPLACE FUNCTION aaa.fnc_work(_id text)
RETURNS text
LANGUAGE plpgsql AS
$func$
BEGIN
-- optionally assert that the steering row exists
PERFORM FROM aaa.monitor WHERE id = _id FOR KEY SHARE SKIP LOCKED;
IF NOT FOUND THEN
RAISE EXCEPTION 'monitor.id = % not found or blocked!', quote_literal(_id);
END IF;
-- lock row
PERFORM FROM aaa.monitor WHERE id = _id FOR NO KEY UPDATE SKIP LOCKED;
IF NOT FOUND THEN
-- we made sure the row exists, so it must be locked
RETURN 'running';
END IF;
----- code for big work HERE -----
-- PERFORM 1/0; -- debug: test exception?
-- PERFORM pg_sleep(5); -- debug: test concurrency?
RETURN '';
EXCEPTION WHEN OTHERS THEN
RETURN SQLERRM;
END
$func$;
致电:
SELECT aaa.fnc_work('invoicing');
调用可以任意嵌套。只要一个事务在处理大作业,就不会启动其他事务。
同样,可选断言取出一个FOR KEY SHARE
锁来消除竞争条件的时间window,添加的异常在正常操作中应该永远不会发生。
为此,我们根本不需要 status
列。行锁本身就是看门人。因此 PERFORM FROM aaa.monitor ...
中的空 SELECT
列表。附带好处:这也不会通过来回更新行来产生死元组。如果你因为其他原因还需要更新status
,你又回到了上一章的可见性问题。你可以结合两者...
关于PERFORM
:
- SELECT or PERFORM in a PL/pgSQL function
关于行锁:
在 Postgres 12 数据库中,我在一个函数中有多个查询(SELECT
、UPDATE
、...),这些查询总共需要大约 20 分钟才能完成。
如果 status
不是 运行:
UPDATE
create or replace function aaa.fnc_work() returns varchar as
$body$
begin
if (select count(*) from aaa.monitor where id='invoicing' and status='running')=0 then
return 'running';
else
update aaa.monitor set status='running' where id='invoicing';
end if;
--- rest of code ---
--finally
update aaa.monitor set status='idle' where id='invoicing';
return '';
exception when others then
return SQLERRM::varchar;
end
$body$
language plpgsql;
这个想法是为了防止其他用户在 status
空闲之前执行 --- rest of code ---
。
但是,其他人(调用相同的函数)似乎看不到更新的状态,他们也继续并开始执行 --- rest of code ---
。如何在以下时间后强制提交:
update aaa.monitor set status='running'
where id='invoicing'
;
以便所有其他用户会话可以看到更新后的 status
并相应地退出。
我需要交易吗?
您要完成的是自治事务。 PostgreSQL 没有简单的方法来做到这一点。 link here 讨论了一些备选方案。
但是上面 linked 文章中讨论的一种方法是使用 PostgreSQL dblink 扩展。
您需要将扩展程序添加到服务器
CREATE EXTENSION dblink;
然后您可以创建一个从您的函数中调用的新函数
CREATE FUNCTION update_monitor_via_dblink(msg text)
RETURNS void
LANGUAGE sql
AS $function$
select dblink('host=/var/run/postgresql port=5432 user=postgres dbname=postgres',
format(' update aaa.monitor set status= %M',msg::text)
$function$;
您可能要考虑的另一件事是使用 PostgreSQL 锁。可以找到更多信息 here。
继续阅读。我把最好的留到最后。
概念验证 PROCEDURE
Postgres FUNCTION
始终是原子的(在单个事务包装器中运行)并且不能处理事务。所以 COMMIT
是不允许的。您 可以 使用 dblink
的技巧来解决这个问题。参见:
- Does Postgres support nested or autonomous transactions?
- How do I do large non-blocking updates in PostgreSQL?
但对于像这样的嵌套交易,请考虑使用 PROCEDURE
。在 Postgres 11 中引入。您可以在那里管理交易:
CREATE OR REPLACE PROCEDURE aaa.proc_work(_id text, INOUT _result text = NULL)
LANGUAGE plpgsql AS
$proc$
BEGIN
-- optionally assert steering row exists
PERFORM FROM aaa.monitor WHERE id = _id FOR KEY SHARE SKIP LOCKED;
IF NOT FOUND THEN
RAISE EXCEPTION 'monitor.id = % not found or blocked!', quote_literal(_id);
END IF;
-- try UPDATE
UPDATE aaa.monitor
SET status = 'running'
WHERE id = _id -- assuming valid _id
AND status <> 'running'; -- assuming "status" is NOT NULL
IF NOT FOUND THEN
_result := 'running'; RETURN; -- this is how you return with INOUT params
END IF;
COMMIT; -- HERE !!!
BEGIN -- start new code block
----- code for big work HERE -----
-- PERFORM 1/0; -- debug: test exception?
-- PERFORM pg_sleep(5); -- debug: test concurrency?
_result := '';
-- also catching QUERY_CANCELED and ASSERT_FAILURE
-- is a radical step to try andrelease 'running' rows no matter what
EXCEPTION WHEN OTHERS OR QUERY_CANCELED OR ASSERT_FAILURE THEN
-- ROLLBACK; -- roll back (unfinished?) big work
_result := SQLERRM;
END; -- end of nested block
UPDATE aaa.monitor -- final reset
SET status = 'idle'
WHERE id = _id
AND status <> 'idle'; -- only if needed
END
$proc$;
致电(重要!):
CALL aaa.proc_work('invoicing'); -- stand-alone call!
重要提示
在UPDATE
之后添加COMMIT
。之后,并发事务可以看到更新的行。
但是没有额外的BEGIN
或START TRANSACTION
。 The manual:
In procedures invoked by the
CALL
command as well as in anonymous code blocks (DO
command), it is possible to end transactions using the commandsCOMMIT
andROLLBACK
. A new transaction is started automatically after a transaction is ended using these commands, so there is no separateSTART TRANSACTION
command. (Note thatBEGIN
andEND
have different meanings in PL/pgSQL.)
我们需要一个单独的 PL/pgSQL code block, because you have a custom exception handler, and (quoting the manual):
A transaction cannot be ended inside a block with exception handlers.
(但我们可以在 EXCEPTION
处理程序中 COMMIT
/ ROLLBACK
。)
您不能在外部事务中调用此过程,也不能与任何其他 DML 语句一起调用,这会强制使用外部事务包装器。必须是独立的 CALL
。参见:
注意最后的 UPDATE aaa.monitor SET status = 'idle' WHERE ...
。否则(已提交!)status
将在异常后无限期地保持 'running'。
关于从过程返回值:
我在 INOUT
参数中添加了 DEFAULT NULL
,因此您不必在调用时提供参数。
UPDATE
直接。如果该行是 'running',则不会发生更新。 (这也修复了逻辑:当找到 no 行 status='running'
时,您的 IF
表达式似乎向后 returns 'running'。看来你想要相反的东西。)
我添加了一个(可选!)断言以确保 table aaa.monitor
中的行存在。添加 FOR KEY SHARE
锁也可以消除断言和后续 UPDATE
之间竞争条件的微小时间 window。锁定与删除或更新 PK 列冲突 - 但 不 与更新 status
冲突。所以在正常操作中永远不会引发异常! The manual:
Currently, the set of columns considered for the
UPDATE
case are those that have a unique index on them that can be used in a foreign key (so partial indexes and expressional indexes are not considered), but this may change in the future.
SKIP LOCK
在锁冲突的情况下不等待。添加的异常永远不应该发生。只是展示一个防水的概念证明。
您的更新显示 aaa.monitor
中有 25 行,因此我添加了参数 _id
。
更好的方法
以上内容可能有助于保存更多信息以供全世界查看。对于队列操作,有更有效 的解决方案。使用 lock 代替,这对其他人来说是“可见的”。然后你不需要嵌套事务开始,一个普通的 FUNCTION
就可以了:
CREATE OR REPLACE FUNCTION aaa.fnc_work(_id text)
RETURNS text
LANGUAGE plpgsql AS
$func$
BEGIN
-- optionally assert that the steering row exists
PERFORM FROM aaa.monitor WHERE id = _id FOR KEY SHARE SKIP LOCKED;
IF NOT FOUND THEN
RAISE EXCEPTION 'monitor.id = % not found or blocked!', quote_literal(_id);
END IF;
-- lock row
PERFORM FROM aaa.monitor WHERE id = _id FOR NO KEY UPDATE SKIP LOCKED;
IF NOT FOUND THEN
-- we made sure the row exists, so it must be locked
RETURN 'running';
END IF;
----- code for big work HERE -----
-- PERFORM 1/0; -- debug: test exception?
-- PERFORM pg_sleep(5); -- debug: test concurrency?
RETURN '';
EXCEPTION WHEN OTHERS THEN
RETURN SQLERRM;
END
$func$;
致电:
SELECT aaa.fnc_work('invoicing');
调用可以任意嵌套。只要一个事务在处理大作业,就不会启动其他事务。
同样,可选断言取出一个FOR KEY SHARE
锁来消除竞争条件的时间window,添加的异常在正常操作中应该永远不会发生。
为此,我们根本不需要 status
列。行锁本身就是看门人。因此 PERFORM FROM aaa.monitor ...
中的空 SELECT
列表。附带好处:这也不会通过来回更新行来产生死元组。如果你因为其他原因还需要更新status
,你又回到了上一章的可见性问题。你可以结合两者...
关于PERFORM
:
- SELECT or PERFORM in a PL/pgSQL function
关于行锁: