1 个事务中的 Doobie 和 DB 访问组合

Doobie and DB access composition within 1 transaction

Doobie book 表示从存储库层 return ConnectionIO 是一个好习惯。它提供了链接调用并在一个事务中执行它们的能力。 很好很清楚。

现在假设我们正在开发 REST API 服务,我们的场景是:

  1. 在数据库中查找对象
  2. 对该对象执行一些异步操作(使用 cats.effect.IO 或 monix.eval.Task)。
  3. 将对象存储在数据库中。

我们想在 1 个交易中执行所有这些步骤。问题是,如果没有 transactor.trans() 为我们提供的自然转换,我们将在 2 个单子中工作 - TaskConnectionIO。那是不可能的。

问题是 - 如何将 doobie ConnectionIO 与 1 个组合中的任何效果 monad 混合,例如我们在 1 个事务中工作并且能够 commit/rollback 在世界尽头的所有 DB 突变?

谢谢!

更新: 小例子

def getObject: ConnectionIO[Request]                      = ???
def saveObject(obj: Request): ConnectionIO[Request]       = ???
def processObject(obj: Request): monix.eval.Task[Request] = ???

val transaction:??? = for {
    obj       <- getObject             //ConnectionIO[Request]
    processed <- processObject(obj)    //monix.eval.Task[Request]
    updated   <- saveObject(processed) //ConnectionIO[Request]
  } yield updated

UPD2:@oleg-pyzhcov 提供的正确答案是将效果数据类型提升为 ConnectionIO,如下所示:

def getObject: ConnectionIO[Request]                      = ???
def saveObject(obj: Request): ConnectionIO[Request]       = ???
def processObject(obj: Request): monix.eval.Task[Request] = ???

val transaction: ConnectionIO[Request] = for {
    obj       <- getObject                                           //ConnectionIO[Request]
    processed <- Async[ConnectionIO].liftIO(processObject(obj).toIO) //ConnectionIO[Request]
    updated   <- saveObject(processed)                               //ConnectionIO[Request]
} yield updated
val result: Task[Request] = transaction.transact(xa)

ConnectionIO in doobie has a cats.effect.Async instance,除其他外,它允许您通过 liftIO 方法将任何 cats.effect.IO 转换为 ConnectionIO

import doobie.free.connection._
import cats.effect.{IO, Async}
val catsIO: IO[String] = ???
val cio: ConnectionIO[String] = Async[ConnectionIO].liftIO(catsIO)

对于 monix.eval.Task,您最好的选择是使用 Task#toIO 并执行相同的技巧,但您需要范围内的 monix Scheduler