如何让 Scalaz ZIO 变得懒惰?

How do I make a Scalaz ZIO lazy?

我有一个严重的副作用函数(想想数据库调用),我想将其用作惰性值,这样它只会在第一次使用时被调用(如果从未使用过则根本不会被调用)。

如何使用 ZIO 执行此操作?

如果我的程序看起来像这样,函数只被调用一次(但即使结果根本没有被使用):

import scalaz.zio.IO
import scalaz.zio.console._

object Main extends scalaz.zio.App {

  def longRunningDbAction: IO[Nothing, Integer] = for {
    _ <- putStrLn("Calling the database now")
  } yield 42

  def maybeUseTheValue(x: Integer): IO[Nothing, Unit] = for {
    _ <- putStrLn(s"The database said ${x}")
  } yield ()

  def maybeNeedItAgain(x: Integer): IO[Nothing, Unit] = for {
    _ <- putStrLn("Okay, we did not need it again here.")
  } yield ()

 override def run(args: List[String]): IO[Nothing, Main.ExitStatus] = for {
    valueFromDb <- longRunningDbAction
    _ <- maybeUseTheValue(valueFromDb)
    _ <- maybeNeedItAgain(valueFromDb)
  } yield ExitStatus.ExitNow(0)

}

我想我必须传递一个 IO 来生成 Int 而不是已经具体化的 Int,但是如果我传递原始的 IO 那只是调用数据库,会重复调用:

object Main extends scalaz.zio.App {

  def longRunningDbAction: IO[Nothing, Integer] = for {
    _ <- putStrLn("Calling the database now")
  } yield 42


  def maybeUseTheValue(x: IO[Nothing, Integer]): IO[Nothing, Unit] = for {
    gettingItNow <- x
    _ <- putStrLn(s"The database said ${gettingItNow}")
  } yield ()

  def maybeNeedItAgain(x: IO[Nothing, Integer]): IO[Nothing, Unit] = for {
    gettingItNow <- x
    _ <- putStrLn(s"Okay, we need it again here: ${gettingItNow}")
  } yield ()

  override def run(args: List[String]): IO[Nothing, Main.ExitStatus] = for {
    _ <- maybeUseTheValue(longRunningDbAction)
    _ <- maybeNeedItAgain(longRunningDbAction)
  } yield ExitStatus.ExitNow(0)

}

有没有办法 "wrap" 将 longRunningDbAction 变成让它变得懒惰的东西?

我得出以下结论:

 def lazyIO[E,A](io: IO[E,A]): IO[Nothing, IO[E, A]] = {
    for {
      barrier <- Promise.make[Nothing, Unit]
      fiber <- (barrier.get *> io).fork
    } yield barrier.complete(()) *> putStrLn("getting it") *> fiber.join
  }

ZIO 1.0-RC4 的更新版本(具有环境支持)

def lazyIO[R, E, A](io: ZIO[R, E, A]): ZIO[R, Nothing, ZIO[R, E, A]] = {
  for {
    barrier <- Promise.make[Nothing, Unit]
    fiber <- (barrier.await *> io).fork
  } yield barrier.succeed(()) *> fiber.join
}

所以这是一个接受 IO 和 returns 它的惰性版本的 IO。

它的工作原理是启动一个运行原始 iofiber,但只有在 Promise (barrier) 完成之后。

惰性 IO 首先完成 barrier(如果它是第一个执行此操作的人,它将解锁 fiber,进而运行包装的 io),然后加入fiber 检索计算结果。

有了这个,我可以做到

override def run(args: List[String]): IO[Nothing, Main.ExitStatus] = for {
    valueFromDb <- lazyIO(longRunningDbAction)
    _ <- maybeUseTheValue(valueFromDb)
    _ <- maybeNeedItAgain(valueFromDb)
  } yield ExitStatus.ExitNow(0)

并且控制台输出显示延迟值确实被拉取了两次,但只有第一个触发了 "database access":

getting it
Calling the database now
The database said 42
getting it
Okay, we need it again here: 42

ZIO 现在有 memoize

override def run(args: List[String]): IO[Nothing, Main.ExitStatus] = for {
   valueFromDb <- ZIO.memoize(longRunningDbAction)
   _ <- maybeUseTheValue(valueFromDb)
   _ <- maybeNeedItAgain(valueFromDb)
} yield ExitStatus.ExitNow(0)

它与 做的事情基本相同:来源看起来像这样

/**
   * Returns an effect that, if evaluated, will return the lazily computed result
   * of this effect.
   */
  final def memoize: ZIO[R, Nothing, IO[E, A]] =
    for {
      r <- ZIO.environment[R]
      p <- Promise.make[E, A]
      l <- Promise.make[Nothing, Unit]
      _ <- (l.await *> ((self provide r) to p)).fork
    } yield l.succeed(()) *> p.await