锁定 PostgreSQL 中的所有行 table

Lock all the rows in the PostgreSQL table

我正在尝试在 PG table 中创建某种任务队列,与此类似 https://www.pgcon.org/2016/schedule/attachments/414_queues-pgcon-2016.pdf 但有点复杂。

1) 有一些任务与某个entity_id关联,当entity_id不同时,它们可以并行执行。所以对他们来说有一个 table:

create table entity_tasks (
  entity_id bigint,
  task text,
  inserted_at timestamp default now()
);

2) 有些任务必须单独执行,即与所有其他任务顺序执行。对于此类任务,还有一个 table:

create table block_everything_tasks (
  task TEXT,
  inserted_at TIMESTAMP DEFAULT NOW()
);

执行来自 block_everything_tasks 的任务应该阻止来自 entity_tasksblock_everything_tasks 的所有任务的执行。

经过一些原型制作后,我还添加了一个 table

create table entities_for_tasks (
  entity_id bigint primary key
);

获取和执行每个实体的任务是这样的:

begin;
    select entity_id into entity_to_lock
    from entities_for_tasks
    for update skip locked
    limit 1;

    select * from entity_tasks
    where entity_id = entity_to_lock
    order by inserted_at
    limit 1;

    -- execute them and delete from the `entity_tasks`
commit;

到目前为止一切顺利,但是当我尝试从 block_everything_tasks 执行抓取任务时,它变得很尴尬。我在这里看到了一些解决方案,但不喜欢其中任何一个。

1) 我可以像这样 entity_to_lock table 明确地锁定整个

begin;
    lock table entity_to_lock;

    select * from block_everything_tasks
    order by inserted_at
    limit 1;

    -- execute them and delete from the `entity_tasks`
commit;

但这会阻止向 entity_to_lock 的任务添加行,并且可能会阻止向其中一个队列添加任务。

2) 或者我可以尝试做这样的事情

begin;
    with lock as (
      select * from entity_to_lock for update
    )
    select * from block_everything_tasks
    order by inserted_at
    for update skip locked
    limit 1;

    -- execute them and delete from the `entity_tasks`
commit;

它看起来像是一个不错的解决方案,我不阻止提交者而且 entity_to_lock 也不是太大,但我不使用 entity_to_lock 中的行而且它们不是已锁定,所以它不起作用。

所以我的问题是

INSERTUPDATE 都获得一个 ROW EXCLUSIVE 锁,所以你不会发现任何 table 级别的锁只排除一个而不排除另一个。

您可以使用 SELECT FOR UPDATE 锁定所有 现有 行以防止更改,但它不会同时影响 INSERTed 记录,因此它们会仍会被拾取和处理,无论当前有什么任务 运行ning.

保持 entities_for_tasks table 与 entity_tasks 同步也可能存在问题,具体取决于您填充它的方式以及您 isolation level 的内容重新使用;这种模式在低于 SERIALIZABLE.

时容易出现竞争条件

退一步说,您确实有两个截然不同的问题需要解决:创建和分配任务,以及协调任务的执行。第一个问题由基本排队机制完美处理,但试图通过重载相同机制来解决第二个问题似乎是所有这些冲突的根源。

所以,不要管队列,想想你实际还需要什么来协调任务执行:

  1. 一把锁,上面写着 "a task is running"
  2. 一组锁上面写着"a task is running against entity x"

...来自 block_everything_tasks 的任务需要对 (1) 的独占锁定,而来自 entity_tasks 的任务可以共享对 (1) 的锁定,但需要独占锁定 (2).

最明确的实现方式是通过 advisory locks,它允许您 "lock" 具有特定应用程序含义的任意整数。

假设没有实体具有 ID 0,让我们将其用于顶级 "task is running" 锁。然后,从队列中成功拉取一个任务后,每个独占任务都会 运行:

SELECT pg_advisory_xact_lock(0);

...每个实体任务将 运行:

SELECT pg_advisory_xact_lock_shared(0);
SELECT pg_advisory_xact_lock(<entity_id of selected task>);

建议锁定的主要问题是数据库的每个用户都需要就这些整数的含义达成一致,否则他们可能最终会出于不相关的目的争用同一个锁。锁定函数的双参数 (int,int) 重载允许您将锁定范围限定为特定用例,但是当您的 ID 为 bigint 时,这并没有多大帮助。

如果您不能确定您是数据库中唯一使用建议锁的人,您可以使用基于 table 的方法来模拟这一点。设置 table:

CREATE_TABLE currently_processing (
  entity_id bigint PRIMARY KEY
);

...然后独享任务:

LOCK currently_processing;

...对于每个实体的任务:

INSERT INTO currently_processing VALUES (<entity_id of selected task>);
<run the task>
DELETE FROM currently_processing WHERE entity_id = <entity_id of selected task>;

INSERT会尝试获取table上的共享锁(被排他任务阻塞),PRIMARY KEY上的唯一索引会导致并发INSERTs 相同 ID 阻塞,直到冲突事务提交或回滚。