保持同一行并发更新的完整性

Maintain integrity on concurrent updates of the same row

在下面的代码片段中,我尝试在 2 个不同线程的 2 个不同事务中查找、删除和创建相同的项目。

在线程 1 中,我创建事务 1,找到项目,然后将其删除。

完成后,我允许线程 2 创建事务 2,并尝试查找项目。 Find() 方法在这里阻塞,因为我使用选项 FOR UPDATE.

回到线程 1,重新创建项目并提交事务 1,这允许线程 2 中的 Find() 完成。以下是出现的问题:

如果我使用隔离级别 "ReadCommitted",我会得到一个未找到的错误 - 这对我来说没有意义,因为我认为 ReadCommitted 事务可以看到更新被其他人应用。

如果我使用隔离级别 "Serializable",我会得到错误:pq: could not serialize access due to concurrent update.

为什么我会看到这种行为?我认为在第二个 find 解锁后,它应该为我提供最新的行。

我怎样才能做到当一行正在被修改时,任何其他读取都将锁定,并在其他线程完成后解锁并返回最新数据?

db, err := gorm.Open("postgres", "host=localHost port=5432 user=postgres dbname=test-rm password=postgres sslmode=disable")
if err != nil { panic("failed to connect database") }
db.SingularTable(true)
db.DropTableIfExists(&Product{})
db.AutoMigrate(&Product{})

db.Create(&Product{Code: "A", Price: 1000})
// SQL: INSERT  INTO "product" ("code","price") VALUES ('A',1000) RETURNING "products"."id"

txOpt := &sql.TxOptions{Isolation: sql.LevelSerializable}

doneTrans1 := make(chan struct{})

go func(){
    item1 := &Product{}

    tx1 := db.BeginTx(context.Background(), txOpt)

    err = tx1.Set("gorm:query_option", "FOR UPDATE").Find(item1, "code = ?", "A").Error
    // SQL: SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE

    item1.Price = 3000

    err = tx1.Delete(item1).Error
    // SQL: DELETE FROM "product"  WHERE "product"."id" = 1

    doneTrans1<-struct{}{}
    time.Sleep(time.Second * 3)

    err = tx1.Create(item1).Error
    // SQL: INSERT  INTO "product" ("id","code","price") VALUES (1,'A',3000) RETURNING "product"."id"

    tx1.Commit()
}()

// Ensure other trans work started
<-doneTrans1
time.Sleep(time.Second * 2)

item2 := &Product{}

tx2 := db.BeginTx(context.Background(), txOpt)

err = tx2.Set("gorm:query_option", "FOR UPDATE").Find(item2, "code = ?", "A").Error
// SQL: SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE
// ERROR occurs here

item2.Price = 5000
err = tx2.Delete(item2).Error
err = tx2.Create(item2).Error
tx2.Commit()
time.Sleep(time.Second * 5)

也许我理解错了——我以前没有用过gorm。 然而,根据您的查询评论,您的两个 goroutine 中的两个事务都有一个 "SELECT .. FOR UPDATE" 并且它们是 运行 并行的。在尝试 "SELECT.. FOR UPDATE" 相同的行之前,您的主 goroutine 没有等待在您的第二个 goroutine 中启动的事务提交。

根据您的解释,您可能错误地将 "FOR UPDATE" 包含在第二个 goroutine 中。

或者您可以在第二个 goroutine 中使用 sync.Mutex 锁并释放它 post 提交。 main goroutine 等待获取锁,然后才执行它的查询。

为了回答这个问题,我认为最好去除 goroutine 的复杂性(事实上,完全去)并专注于 SQL。以下是 SQL 语句的顺序,它们将是 运行(错误发生后我忽略了所有内容,因为这几乎无关紧要,执行顺序为 complex/variable!)。

在主程序中

INSERT  INTO "product" ("code","price") VALUES ('A',1000) RETURNING "products"."id"

GoRoutine

BEGIN TX1
SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE
DELETE FROM "product"  WHERE "product"."id" = 1

在主程序中

BEGIN TX2
SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE -- ERROR occurs here

关于你的问题。

问题一

If I use isolation level "ReadCommitted", I get a not found error - this makes no sense to me, because I thought that a ReadCommitted transaction can see updates applied by others.

来自 Read Committed Isolation Level 的文档:

UPDATE, DELETE, SELECT FOR UPDATE, and SELECT FOR SHARE commands behave the same as SELECT in terms of searching for target rows: they will only find target rows that were committed as of the command start time. However, such a target row might have already been updated (or deleted or locked) by another concurrent transaction by the time it is found. In this case, the would-be updater will wait for the first updating transaction to commit or roll back (if it is still in progress). If the first updater rolls back, then its effects are negated and the second updater can proceed with updating the originally found row. If the first updater commits, the second updater will ignore the row if the first updater deleted it, otherwise it will attempt to apply its operation to the updated version of the row.

因此 TX2 中的 SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE 将等待 TX1 完成。此时 TX1 已删除产品 A,因此忽略该行并且不返回任何结果。现在我知道 TX1 也重新创建了产品 A,但请记住 "a SELECT query (without a FOR UPDATE/SHARE clause) sees only data committed before the query began;" 并且在 TX1 重新创建记录之前开始的 select 将不会被看到。

问题二

If I use isolation level "Serializable", I get the error: pq: could not serialize access due to concurrent update.

来自 Repeatable Read Isolation Level 的文档(可序列化是更高级别,因此适用这些规则以及一些更严格的规则):

UPDATE, DELETE, SELECT FOR UPDATE, and SELECT FOR SHARE commands behave the same as SELECT in terms of searching for target rows: they will only find target rows that were committed as of the transaction start time. However, such a target row might have already been updated (or deleted or locked) by another concurrent transaction by the time it is found. In this case, the repeatable read transaction will wait for the first updating transaction to commit or roll back (if it is still in progress). If the first updater rolls back, then its effects are negated and the repeatable read transaction can proceed with updating the originally found row. But if the first updater commits (and actually updated or deleted the row, not just locked it) then the repeatable read transaction will be rolled back with the message

在您的代码中,TX1 更新了产品 A,这意味着 TX2 中的查询将延迟到 TX1 提交,届时它将因错误而中止(如果 TX1 回滚,则它将继续)。

如何进行第二次更新?*

维护事务完整性是一个难题,而 PostgreSQL 中的功能是一些非常聪明的人大量工作的结果。如果您发现自己与数据库作斗争,退后一步并考虑是否需要改变您的方法(或者您认为的问题是否是一个真正的问题)通常是个好主意。

在您的示例中,您有两个删除和重新创建相同记录的例程;我无法预见您希望两笔交易都进行的情况。在可能发生这种情况的真实系统中,您不会仔细安排计时器来确保首先开始一个事务。这意味着事务完成后数据库的状态将取决于哪个先到达 SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE。所以实际上,一个人是否失败并不重要(因为结果在任何情况下都是随机的);它实际上是一个更好的结果,因为您可以建议用户(他们可以检查记录并在需要时重新运行任务)。

因此,在阅读本文的其余部分之前,我建议您考虑一下这是否是一个问题(我对您要完成的工作没有背景知识,因此很难发表评论)。

如果您真的想确保更新继续进行,您有以下几种选择:

  • 如果使用 "Serializable",您需要检测失败并重试事务(如果这是业务逻辑的要求)
  • 如果使用 "Read committed" 则将 DELETE/INSERT 替换为更新(在这种情况下,PostgreSQL 将在释放第一个事务锁时重新评估 WHERE 子句)。

但是我觉得更好的方法是取消其中的大部分并尝试在单个步骤中执行这样的更新(这可能意味着绕过 ORM)。如果您想最大限度地减少此类问题的可能性,那么最大限度地减少 number/duration 锁很重要,并且在单个步骤中执行操作会有很大帮助。对于复杂的操作,使用存储过程可以加快速度,但与其他并发 运行ning 操作发生冲突的可能性仍然(减少)。

您可能还想看看 Optimistic Locking,因为在某些情况下这更有意义(例如,在您阅读信息的地方,将其显示给用户并等待更改,但与此同时另一个用户可能进行了更改)。