是否可以将 Scala ZIO 中的效果提升到另一个有效的上下文中?

Is it possible to lift an effect in Scala ZIO into another effectful context?

我正在寻找一种无需先在 Zio 中执行结果即可懒惰地合成两种效果的方法。我的程序采用以下形式:

/**
  * Returns a reference to an effectful singleton cron scheduler backed by akka
  * See https://github.com/philcali/cronish for more info on the API
  */
def scheduled: UManaged[Ref[Scheduled]] = ???

def schedule[R, E, A](e: => ZIO[R, E, A], crondef: String) = 
  (for {
    resource <- scheduled
    task     <- ZManaged.fromEffect(e) // I need to lift the underlying effect here, not access its result
  } yield resource.modify(schedule => schedule(job(task), crondef.cron) -> schedule)).flattenM

def scheduleEffect[A](e: => A, description: String = "")(crondef: String) =
  (for {
    resource <- scheduled
  } yield resource.modify(schedule => schedule(job(e), crondef.cron) -> schedule)).flattenM

// Program which schedules cron jobs to increment/decrement x and y, respectively
def run(args: List[String]): URIO[ZEnv, ExitCode] = {
  var x = 0
  var y = 100
  (for {
    _ <- Scheduler.schedule(UIO({ x += 1; println(x) }), "every second")
    _ <- Scheduler.scheduleEffect({ y -= 1; println(y) }, "every second")
  } yield ())
    .provideCustomLayer(???)
    .as(ExitCode.success)
    .useForever
  }

在当前的公式中,y 的递减每秒运行一次,直到程序终止,而 x 的递增仅运行一次。我知道 ZIO 提供了一个 Schedule 实用程序,但出于遗留兼容性原因,我必须坚持使用 Cronish 库使用的有效单例。基本上 job 采用类型 A 的传递引用效果,并将其暂停在 CronTask 中,以便根据 crondef 定义的时间表在 Scheduled 单例中执行=].

我想知道是否有可能在 ZIO 的上下文中自己编写 效果 而不是它们的结果?我基本上已经将遗留的 cron 调度程序包装在 ZIO 数据类型中以正确管理并发性,但我仍然需要我的代码中其他 ZIO 签名方法的暂停效果可供我传递到调度程序。

我最终通过阅读 ZIO.effectAsyncM 的源代码找到了解决方案,特别注意到它对 ZIO.runtime[R] 的引用:

/**
 * Imports an asynchronous effect into a pure `ZIO` value. This formulation is
 * necessary when the effect is itself expressed in terms of `ZIO`.
 */
def effectAsyncM[R, E, A](
  register: (ZIO[R, E, A] => Unit) => ZIO[R, E, Any]
): ZIO[R, E, A] =
  for {
    p <- Promise.make[E, A]
    r <- ZIO.runtime[R]         // This line right here!
    a <- ZIO.uninterruptibleMask { restore =>
           val f = register(k => r.unsafeRunAsync_(k.to(p)))

           restore(f.catchAllCause(p.halt)).fork *> restore(p.await)
         }
  } yield a

虽然我无法在 ZIO 文档中找到对此方法的直接引用,但 Scaladocs 已经足够清楚了:

Returns an effect that accesses the runtime, which can be used to (unsafely) execute tasks. This is useful for integration with legacy code that must call back into ZIO code.

有了这个,我的新实现效果如下:

def schedule[R, E, A](e: => ZIO[R, E, A], crondef: String) = 
  (for {
    resource <- scheduled
    runtime  <- ZManaged.fromEffect(ZIO.runtime[R])
  } yield resource.modify({
    schedule => schedule(job(runtime.unsafeRun(e)), crondef.cron) -> schedule
  })).flattenM