是否可以将 Scala ZIO 中的效果提升到另一个有效的上下文中?
Is it possible to lift an effect in Scala ZIO into another effectful context?
我正在寻找一种无需先在 Zio 中执行结果即可懒惰地合成两种效果的方法。我的程序采用以下形式:
/**
* Returns a reference to an effectful singleton cron scheduler backed by akka
* See https://github.com/philcali/cronish for more info on the API
*/
def scheduled: UManaged[Ref[Scheduled]] = ???
def schedule[R, E, A](e: => ZIO[R, E, A], crondef: String) =
(for {
resource <- scheduled
task <- ZManaged.fromEffect(e) // I need to lift the underlying effect here, not access its result
} yield resource.modify(schedule => schedule(job(task), crondef.cron) -> schedule)).flattenM
def scheduleEffect[A](e: => A, description: String = "")(crondef: String) =
(for {
resource <- scheduled
} yield resource.modify(schedule => schedule(job(e), crondef.cron) -> schedule)).flattenM
// Program which schedules cron jobs to increment/decrement x and y, respectively
def run(args: List[String]): URIO[ZEnv, ExitCode] = {
var x = 0
var y = 100
(for {
_ <- Scheduler.schedule(UIO({ x += 1; println(x) }), "every second")
_ <- Scheduler.scheduleEffect({ y -= 1; println(y) }, "every second")
} yield ())
.provideCustomLayer(???)
.as(ExitCode.success)
.useForever
}
在当前的公式中,y
的递减每秒运行一次,直到程序终止,而 x
的递增仅运行一次。我知道 ZIO 提供了一个 Schedule
实用程序,但出于遗留兼容性原因,我必须坚持使用 Cronish 库使用的有效单例。基本上 job
采用类型 A
的传递引用效果,并将其暂停在 CronTask
中,以便根据 crondef
定义的时间表在 Scheduled
单例中执行=].
我想知道是否有可能在 ZIO 的上下文中自己编写 效果 而不是它们的结果?我基本上已经将遗留的 cron 调度程序包装在 ZIO 数据类型中以正确管理并发性,但我仍然需要我的代码中其他 ZIO 签名方法的暂停效果可供我传递到调度程序。
我最终通过阅读 ZIO.effectAsyncM
的源代码找到了解决方案,特别注意到它对 ZIO.runtime[R]
的引用:
/**
* Imports an asynchronous effect into a pure `ZIO` value. This formulation is
* necessary when the effect is itself expressed in terms of `ZIO`.
*/
def effectAsyncM[R, E, A](
register: (ZIO[R, E, A] => Unit) => ZIO[R, E, Any]
): ZIO[R, E, A] =
for {
p <- Promise.make[E, A]
r <- ZIO.runtime[R] // This line right here!
a <- ZIO.uninterruptibleMask { restore =>
val f = register(k => r.unsafeRunAsync_(k.to(p)))
restore(f.catchAllCause(p.halt)).fork *> restore(p.await)
}
} yield a
虽然我无法在 ZIO 文档中找到对此方法的直接引用,但 Scaladocs 已经足够清楚了:
Returns an effect that accesses the runtime, which can be used to (unsafely) execute tasks. This is useful for integration with legacy code that must call back into ZIO code
.
有了这个,我的新实现效果如下:
def schedule[R, E, A](e: => ZIO[R, E, A], crondef: String) =
(for {
resource <- scheduled
runtime <- ZManaged.fromEffect(ZIO.runtime[R])
} yield resource.modify({
schedule => schedule(job(runtime.unsafeRun(e)), crondef.cron) -> schedule
})).flattenM
我正在寻找一种无需先在 Zio 中执行结果即可懒惰地合成两种效果的方法。我的程序采用以下形式:
/**
* Returns a reference to an effectful singleton cron scheduler backed by akka
* See https://github.com/philcali/cronish for more info on the API
*/
def scheduled: UManaged[Ref[Scheduled]] = ???
def schedule[R, E, A](e: => ZIO[R, E, A], crondef: String) =
(for {
resource <- scheduled
task <- ZManaged.fromEffect(e) // I need to lift the underlying effect here, not access its result
} yield resource.modify(schedule => schedule(job(task), crondef.cron) -> schedule)).flattenM
def scheduleEffect[A](e: => A, description: String = "")(crondef: String) =
(for {
resource <- scheduled
} yield resource.modify(schedule => schedule(job(e), crondef.cron) -> schedule)).flattenM
// Program which schedules cron jobs to increment/decrement x and y, respectively
def run(args: List[String]): URIO[ZEnv, ExitCode] = {
var x = 0
var y = 100
(for {
_ <- Scheduler.schedule(UIO({ x += 1; println(x) }), "every second")
_ <- Scheduler.scheduleEffect({ y -= 1; println(y) }, "every second")
} yield ())
.provideCustomLayer(???)
.as(ExitCode.success)
.useForever
}
在当前的公式中,y
的递减每秒运行一次,直到程序终止,而 x
的递增仅运行一次。我知道 ZIO 提供了一个 Schedule
实用程序,但出于遗留兼容性原因,我必须坚持使用 Cronish 库使用的有效单例。基本上 job
采用类型 A
的传递引用效果,并将其暂停在 CronTask
中,以便根据 crondef
定义的时间表在 Scheduled
单例中执行=].
我想知道是否有可能在 ZIO 的上下文中自己编写 效果 而不是它们的结果?我基本上已经将遗留的 cron 调度程序包装在 ZIO 数据类型中以正确管理并发性,但我仍然需要我的代码中其他 ZIO 签名方法的暂停效果可供我传递到调度程序。
我最终通过阅读 ZIO.effectAsyncM
的源代码找到了解决方案,特别注意到它对 ZIO.runtime[R]
的引用:
/**
* Imports an asynchronous effect into a pure `ZIO` value. This formulation is
* necessary when the effect is itself expressed in terms of `ZIO`.
*/
def effectAsyncM[R, E, A](
register: (ZIO[R, E, A] => Unit) => ZIO[R, E, Any]
): ZIO[R, E, A] =
for {
p <- Promise.make[E, A]
r <- ZIO.runtime[R] // This line right here!
a <- ZIO.uninterruptibleMask { restore =>
val f = register(k => r.unsafeRunAsync_(k.to(p)))
restore(f.catchAllCause(p.halt)).fork *> restore(p.await)
}
} yield a
虽然我无法在 ZIO 文档中找到对此方法的直接引用,但 Scaladocs 已经足够清楚了:
Returns an effect that accesses the runtime, which can be used to (unsafely) execute tasks. This is useful for integration with legacy code that must call back into ZIO code
.
有了这个,我的新实现效果如下:
def schedule[R, E, A](e: => ZIO[R, E, A], crondef: String) =
(for {
resource <- scheduled
runtime <- ZManaged.fromEffect(ZIO.runtime[R])
} yield resource.modify({
schedule => schedule(job(runtime.unsafeRun(e)), crondef.cron) -> schedule
})).flattenM