如何抽象效果并将 ContextShift 与 Scala Cats 一起使用?

How do I abstract over effects and use ContextShift with Scala Cats?

我正在 Scala 和 Cats 中创建一个函数,该函数执行一些 I/O 并将由代码的其他部分调用。我也在学习猫,我希望我的功能:

我假设我的所有函数在 F[_] 中都是通用的,直到 main 方法,因为我试图遵循 these Cat's guidelines

但我很难通过使用 ContextShiftExecutionContext 来使这些约束起作用。我写了一个 full example here,这是示例的摘录:

object ComplexOperation {
  // Thread pool for ComplexOperation internal use only
  val cs = IO.contextShift(
    ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
  )

  // Complex operation that takes resources and time
  def run[F[_]: Sync](input: String): F[String] =
    for {
      r1 <- Sync[F].delay(cs.shift) *> op1(input)
      r2 <- Sync[F].delay(cs.shift) *> op2(r1)
      r3 <- Sync[F].delay(cs.shift) *> op3(r2)
    } yield r3

  def op1[F[_]: Sync](input: String): F[Int]     = Sync[F].delay(input.length)
  def op2[F[_]: Sync](input: Int): F[Boolean]    = Sync[F].delay(input % 2 == 0)
  def op3[F[_]: Sync](input: Boolean): F[String] = Sync[F].delay(s"Complex result: $input")
}

这显然不会抽象过度效果,因为 ComplexOperation.run 需要 ContextShift[IO] 才能引入异步边界。这样做的正确(或最佳)方法是什么?

ComplexOperation.run 中创建 ContextShift[IO] 会使函数依赖于 IO,这是我不想要的。 在调用者上移动 ContextShift[IO] 的创建只会转移问题:调用者在 F[_] 中也是通用的,那么它如何获得 ContextShift[IO] 以传递给 ComplexOperation.run 而无需明确取决于 IO?

请记住,我不想使用在最顶层定义的全局 ContextShift[IO],但我希望每个组件自行决定。

我的 ComplexOperation.run 应该创建 ContextShift[IO] 还是调用者的责任?

我至少做对了吗?还是我违反了标准做法?

所以我冒昧地重写了您的代码,希望对您有所帮助:

import cats.effect._

object Functions {
  def sampleFunction[F[_]: Sync : ContextShift](file: String, blocker: Blocker): F[String] = {
    val handler: Resource[F, Int] =
      Resource.make(
        blocker.blockOn(openFile(file))
      ) { handler =>
        blocker.blockOn(closeFile(handler))
      }

    handler.use(handler => doWork(handler))
  }

  private def openFile[F[_]: Sync](file: String): F[Int] = Sync[F].delay {
    println(s"Opening file $file with handler 2")
    2
  }

  private def closeFile[F[_]: Sync](handler: Int): F[Unit] = Sync[F].delay {
    println(s"Closing file handler $handler")
  }

  private def doWork[F[_]: Sync](handler: Int): F[String] = Sync[F].delay {
    println(s"Calculating the value on file handler $handler")
    "The final value" 
  }
}

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val result = Blocker[IO].use { blocker =>
      Functions.sampleFunction[IO](file = "filePath", blocker)
    }

    for {
      data <- result
      _ <- IO(println(data))
    } yield ExitCode.Success
  }
}

可以看到运行宁here.

那么,这段代码的作用是什么。
首先,它为文件创建一个 Resource,因为 close 必须完成,即使在保证或失败时也是如此。
它在阻塞线程 poo 上使用 Blocker 到 运行 openclose 操作(这是使用 ContextShift).
最后,在主要方面,它为 **IO* 创建了一个默认的 Blocker,并使用它来调用您的函数;并打印结果。

欢迎提问。