如何在for-comprehension中正确使用服务层的IO和OptionT?

How to properly use IO and OptionT in service layer in for-comprehension?

我有一个带有 CRUD 操作的简单存储库接口(可能,将隐式会话作为一般特征中的参数传递是个坏主意):

trait Repository[Entity, PK] {
  def find(pk: PK)(implicit session: DBSession): OptionT[IO, Entity]

  def insert(e: Entity)(implicit session: DBSession): IO[Entity]

  def update(e: Entity)(implicit session: DBSession): IO[Entity]

  def delete(pk: PK)(implicit session: DBSession): IO[Int]

  def findAll()(implicit session: DBSession): IO[List[Entity]]
}

我想这样使用它:

for {
  _ <- repository.insert(???)
  _ <- repository.delete(???)
  v <- repository.find(???).value
  _ <- someFunctionReliesOnReturnedValue(v)
} yield (???)

此外,如果 v 是 None 我想停止执行,如果有任何错误则回滚事务(我使用 scalikejdbc)。因此,正如我所想,我必须像这样在我的服务层中执行此操作(+将其包装到 Try 或类似的东西中以处理业务异常):

def logic(???) = {
  DB localTx {
    implicit session => {
      (for {
        _ <- repository.insert(???)
        _ <- repository.delete(???)
        v <- repository.find(???).value
        _ <- someFunctionReliesOnReturnedValue(v)
      } yield (???)).unsafeRunSync() // to rollback transaction if there is any error
    }
  }
}

问题在这里:someFunctionReliesOnReturnedValue(v)。它可以是接受 Entity 而不是 Option[Entity] 的任意函数。如何将 OptionT[IO, Entity] 的结果转换为 IO[Entity] 并保存 Option[] 的语义? 这是正确的方法还是我在某处弄错了?


import java.nio.file.{Files, Paths}

import cats.data.OptionT
import cats.effect.IO
import scalikejdbc._

import scala.util.Try

case class Entity(id: Long, value: String)

object Entity extends SQLSyntaxSupport[Entity] {
  override def tableName: String = "entity"

  override def columnNames: Seq[String] = Seq("id", "value")

  def apply(g: SyntaxProvider[Entity])(rs: WrappedResultSet): Entity = apply(g.resultName)(rs)

  def apply(r: ResultName[Entity])(rs: WrappedResultSet): Entity =
    Entity(rs.long(r.id), rs.string(r.value))
}

trait Repository[Entity, PK] {
  def find(pk: PK)(implicit session: DBSession): OptionT[IO, Entity]

  def insert(e: Entity)(implicit session: DBSession): IO[Entity]
}

class EntityRepository extends Repository[Entity, Long] {
  private val alias = Entity.syntax("entity")

  override def find(pk: Long)(implicit session: DBSession): OptionT[IO, Entity] = OptionT{
    IO{
      withSQL {
        select(alias.resultAll).from(Entity as alias).where.eq(Entity.column.id, pk)
      }.map(Entity(alias.resultName)(_)).single().apply()
    }
  }

  override def insert(e: Entity)(implicit session: DBSession): IO[Entity] = IO{
    withSQL {
      insertInto(Entity).namedValues(
        Entity.column.id -> e.id,
        Entity.column.value -> e.value,
      )
    }.update().apply()
    e
  }
}

object EntityRepository {
  def apply(): EntityRepository = new EntityRepository()
}

object Util {
  def createFile(value: String): IO[Unit] = IO(Files.createDirectory(Paths.get("path", value)))
}

class Service {
  val repository = EntityRepository()

  def logic(): Either[Throwable, Unit] = Try {
    DB localTx {
      implicit session => {
        val result: IO[Unit] = for {
          _ <- repository.insert(Entity(1, "1"))
          _ <- repository.insert(Entity(2, "2"))
          e <- repository.find(3)
          _ <- Util.createFile(e.value) // error
          //after this step there is possible more steps (another insert or find)
        } yield ()
        result.unsafeRunSync()
      }
    }
  }.toEither
}

object Test extends App {
  ConnectionPool.singleton("jdbc:postgresql://localhost:5433/postgres", "postgres", "")
  val service = new Service()
  service.logic()
}

Table:

create table entity (id numeric(38), value varchar(255));

我遇到了编译错误:

Error:(69, 13) type mismatch; found : cats.effect.IO[Unit] required: cats.data.OptionT[cats.effect.IO,?] _ <- Util.createFile(e.value)

一般来说,您应该将所有不同的结果转换为具有 monad 的 "most general" 类型。在这种情况下,这意味着您应该通过将所有这些 IO[Entity] 转换为 OptionT[IO, Entity] 并使用 OptionT.liftF:

在整个理解过程中使用 OptionT[IO, A]
for {
  _ <- OptionT.liftF(repository.insert(???))
  _ <- OptionT.liftF(repository.delete(???))
  v <- repository.find(???)
  _ <- someFunctionReliesOnReturnedValue(v)
} yield (???)

如果你有 Option[A],你可以使用 OptionT.fromOption[IO]。问题来自试图在同一个 for-comprehension 中混合 monads。

如果其中任何一个导致 None,这将已经停止执行。至于回滚事务,这取决于你的数据库交互库如何工作,但如果它通过回滚处理异常,那么是的,unsafeRunSync 将起作用。如果您还希望它在结果为 None 时通过抛出异常来回滚,您可以执行以下操作:

val result: OptionT[IO, ...] = ...
result.value.unsafeRunSync().getOrElse(throw new FooException(...))