带有 SQLite 的 Slick3 - 自动提交似乎不起作用

Slick3 with SQLite - autocommit seems to not be working

我正在尝试使用 Slick 为 SQLite 数据库编写一些基本查询

这是我的代码:

class MigrationLog(name: String) {

    val migrationEvents = TableQuery[MigrationEventTable]

    lazy val db: Future[SQLiteDriver.backend.DatabaseDef] = {
        val db = Database.forURL(s"jdbc:sqlite:$name.db", driver = "org.sqlite.JDBC")
        val setup = DBIO.seq(migrationEvents.schema.create)

        val createFuture = for {
          tables <- db.run(MTable.getTables)
          createResult <- if (tables.length  == 0) db.run(setup) else Future.successful()
        } yield createResult

        createFuture.map(_ => db)
    }

    val addEvent: (String, String) => Future[String] = (aggregateId, eventType) => {
        val id = java.util.UUID.randomUUID().toString
        val command = DBIO.seq(migrationEvents += (id, aggregateId, None, eventType, "CREATED", System.currentTimeMillis, None))
        db.flatMap(_.run(command).map(_ => id))
    }

    val eventSubmitted: (String, String) => Future[Unit] = (id, batchId) => {
       val q = for { e <- migrationEvents if e.id === id } yield (e.batchId, e.status, e.updatedAt)
       val updateAction = q.update(Some(batchId), "SUBMITTED", Some(System.currentTimeMillis))
       db.map(_.run(updateAction))
    }

    val eventMigrationCompleted: (String, String, String) => Future[Unit] = (batchId, id, status) => {
       val q = for { e <- migrationEvents if e.batchId === batchId && e.id === id} yield (e.status, e.updatedAt)
       val updateAction = q.update(status, Some(System.currentTimeMillis))
       db.map(_.run(updateAction))
    }

    val allEvents = () => {
        db.flatMap(_.run(migrationEvents.result))
    }
}

以下是我的使用方法:

val migrationLog = MigrationLog("test")
val events = for {
  id <- migrationLog.addEvent("aggregateUserId", "userAccessControl")
  _ <- migrationLog.eventSubmitted(id, "batchID_generated_from_idam")
  _ <- migrationLog.eventMigrationCompleted("batchID_generated_from_idam", id, "Successful")
  events <- migrationLog.allEvents()
} yield events

events.map(_.foreach(event => event match {
  case (id, aggregateId, batchId, eventType, status, submitted, updatedAt) => println(s"$id $aggregateId $batchId $eventType $status $submitted $updatedAt")
}))

想法是先添加事件,然后用 batchId 更新它(这也会更新状态),然后在作业完成时更新状态。 events 应包含状态为 Successful 的事件。

发生的事情是,在 运行 这段代码之后,它会打印状态为 SUBMITTED 的事件。如果我等了一会儿并执行相同的 allEvents 查询,或者只是使用 sqlite3 从命令行检查数据库,那么它就会正确更新。 在开始下一个操作之前,我正在正确地等待期货被解决,默认情况下应该启用自动提交。

我是不是漏掉了什么?

原来问题出在 db.map(_.run(updateAction)),其中 returns Future[Future[Int]] 这意味着当我尝试 运行 另一个查询时命令还没有完成。

将其替换为 db.flatMap(_.run(updateAction)) 解决了问题。