原子 Select & 更新。使行不可见或不能被多个进程选择
Atomic Select & Update. Make row invisible or unselectable by more than one process
情况:
PostgresSQL 数据库。使用 SQL Alchemy ORM 的应用程序(不是很重要)。 Table 有数百万行。
数百个进程通过 table 访问数据库。每个人都想 select 一行并根据其内容执行相对昂贵的操作,然后填充其他 table 并更新该行。
我使用的幼稚方法是这样的:
SELECT * FROM table WHERE status = 'free';
紧接着:
UPDATE table SET status 'in_process';
现在的问题是这些操作不是原子的,这意味着在 SELECT
和 UPDATE
之间的时间内,最多 5 个其他进程可以 select 该行和开始研究它(我提醒你,这非常昂贵)。
现在我知道 SELECT FOR UPDATE
可以锁定行。但它会锁定它们 FOR UPDATE
(duh),它不会禁止行被 selected.
所以我想这一定是一个很常见的问题,但谷歌搜索并没有太大帮助。
SELECT ... FOR UPDATE
是一种很好的技术,因为它们会互相阻塞,所以在您的事务完成之前,其他具有相同意图的人无法获得您的行。
如果你想忽略被其他人锁定的行,你可以添加SKIP LOCKED
子句。
另一个可能对您有吸引力的选择是
UPDATE atable
SET status = 'in_progress'
WHERE status = 'free'
RETURNING *;
看来这是解决这个问题的方法:
使用 python 和 sqlalchemy(但这不是必需的,因为无论如何我都使用原始 SQL)
from sqlalchemy import text
sql = text("UPDATE table
SET status = 'in_process'
WHERE column.id = (SELECT column.id
FROM table
WHERE status='free'
AND pg_try_advisory_xact_lock(column.id)
LIMIT 1 FOR UPDATE)
RETURNING *"
row = next(iter(engine.execution_options(autocommit=True).execute(sql)))
# Now row is a tuple of values
情况:
PostgresSQL 数据库。使用 SQL Alchemy ORM 的应用程序(不是很重要)。 Table 有数百万行。
数百个进程通过 table 访问数据库。每个人都想 select 一行并根据其内容执行相对昂贵的操作,然后填充其他 table 并更新该行。
我使用的幼稚方法是这样的:
SELECT * FROM table WHERE status = 'free';
紧接着:
UPDATE table SET status 'in_process';
现在的问题是这些操作不是原子的,这意味着在 SELECT
和 UPDATE
之间的时间内,最多 5 个其他进程可以 select 该行和开始研究它(我提醒你,这非常昂贵)。
现在我知道 SELECT FOR UPDATE
可以锁定行。但它会锁定它们 FOR UPDATE
(duh),它不会禁止行被 selected.
所以我想这一定是一个很常见的问题,但谷歌搜索并没有太大帮助。
SELECT ... FOR UPDATE
是一种很好的技术,因为它们会互相阻塞,所以在您的事务完成之前,其他具有相同意图的人无法获得您的行。
如果你想忽略被其他人锁定的行,你可以添加SKIP LOCKED
子句。
另一个可能对您有吸引力的选择是
UPDATE atable
SET status = 'in_progress'
WHERE status = 'free'
RETURNING *;
看来这是解决这个问题的方法:
使用 python 和 sqlalchemy(但这不是必需的,因为无论如何我都使用原始 SQL)
from sqlalchemy import text
sql = text("UPDATE table
SET status = 'in_process'
WHERE column.id = (SELECT column.id
FROM table
WHERE status='free'
AND pg_try_advisory_xact_lock(column.id)
LIMIT 1 FOR UPDATE)
RETURNING *"
row = next(iter(engine.execution_options(autocommit=True).execute(sql)))
# Now row is a tuple of values