使用 slick 避免 postgres 更新中的竞争条件

Avoiding race condition in postgres updates using slick

case class Item(id: String, count: Int).  

class ItemRepo(db: Database) {
  val query = TableQuery[ItemTable]


def updateAmount(id: String, incCount :Int) = {
   val currentRow = db.run(query.filter(_.id === id).result).head
   val updatedRow =  Item(currentRow.id, currentRow.count + incCount)
   db.run((query returning query).insertOrUpdate(updatedRow))
}

上面的代码有一个竞争条件 - 如果两个线程 运行 这并行它们可能都读取相同的计数,并且只有最后一个更新线程会增加它们的 incCount。

我怎样才能避免这种情况?我尝试在执行 query.filter 的行中使用 .forUpdate 但它不会阻塞其他线程。我错过了什么吗?

当您从数据库中获取数据时,您应该使用 SELECT ... FOR UPDATE,以便您在该行上拥有排他锁,以防止其他会话在您的事务完成之前更新数据。

在 Slick 中,您可以使用 forUpdate construct available since version 3.2.0

您可以使用一些技巧来改善这种情况。

首先,您要向数据库发送两个独立的查询(两个 db.run 调用)。您可以通过将它们组合成一个动作并将其发送到数据库来改进它。例如:

// Danger: I've not tried to compile this. Please excuse typos.

val lookupAction = query.filter(_.id === id).result


val updateAction = lookupAction.flatMap { matchingRows =>
   val newItem = matchingRows.headOption match {
      case Some(Item(_, count)) => Item(id, count + incCount)
      case None => Item(id, 1) // or whatever your default is 
   }
   (query returning query).insertOrUpdate(newItem)
}

// and you db.run(updateAction.transactionally)

根据您的数据库的事务保证,这会给您一些帮助。我提到它是因为在 Slick 中组合动作是一个重要的概念。这样,您的 forUpdate(Laurenz Albe 指出)可能会按预期运行。

但是,您可能更愿意向数据库发送更新。您需要使用 Slick 的 Plain SQL 功能来执行此操作:

val action = sqlu"UPDATE items SET count = count + $incCount WHERE id = $id"
// And then you db.run(action)

...并允许您的数据库处理并发(取决于数据库隔离级别)。

如果您真的想在所有客户端执行此操作,在 JVM 上的 Scala 代码中,有并发概念,例如锁、参与者和 refs。 Slick 本身没有任何东西可以为您执行 JVM 锁定。