在产生特定值之前,如何重复执行 Zio 计划?

How can I recur a Zio schedule until a particular value is produced?

假设我有一个 IO(因此不安全)操作将 return truefalse。我想用Zio的调度机制来执行这个直到值为true,但是最多只能执行N次。采用 the documentation 中的代码并将其更改为我要实现的目标...

import zio._
import zio.duration._
import zio.console._
import zio.clock._
import java.util.Random

object API {
  // our API method will return true about 30% of the time, but
  // return false the rest of the time (instead of throwing an
  // exception, as is shown in documentation)
  def makeRequest: Task[Boolean] = Task.effect {
    new Random().nextInt(10) > 7
  }
}

object ScheduleUtil {
  def schedule[A] = Schedule.spaced(1.second) && Schedule.recurs(4).onDecision({
    case Decision.Done(_)                 => putStrLn(s"done trying")
    case Decision.Continue(attempt, _, _) => putStrLn(s"attempt #$attempt")
  })
}

import ScheduleUtil._
import API._

object ScheduleApp extends scala.App {

  implicit val rt: Runtime[Clock with Console] = Runtime.default

  rt.unsafeRun(makeRequest.retry(schedule).foldM(
    ex => putStrLn("Exception Failed"),
    v => putStrLn(s"Succeeded with $v"))
  )
}

// run the app
ScheduleApp.main(Array())

这当然行不通。输出是 Succeeded with false 或(偶尔)Succeeded with true。我尝试将 Schedule.recurUntilEquals 添加到 Schedule 定义中,但无济于事。

object ScheduleUtil {
    def schedule[A] = Schedule.spaced(1.second) && Schedule.recurUntilEquals(true) && Schedule.recurs(4).onDecision({
      case Decision.Done(_)                 => putStrLn(s"done trying")
      case Decision.Continue(attempt, _, _) => putStrLn(s"attempt #$attempt")
    })
  }

import ScheduleUtil._

// re-define ScheduleApp in the exact same way as above, and the following error results:

cmd93.sc:5: polymorphic expression cannot be instantiated to expected type;
 found   : [A]zio.Schedule[zio.console.Console,Boolean,((Long, Boolean), Long)]
    (which expands to)  [A]zio.Schedule[zio.Has[zio.console.Console.Service],Boolean,((Long, Boolean), Long)]
 required: zio.Schedule[?,Throwable,?]
  rt.unsafeRun(makeRequest.retry(schedule).foldM(

如何使用 Zio 调度程序完成这样的用例?当然,我可以重新定义 makeRequest 任务以故意抛出异常,而不是 returning false,这与文档中的一样有效。但我希望避免不必要的异常 generation/handling.

object API {
    // our API method will return true about 30% of the time, but
    // return false the rest of the time (instead of throwing an
    // exception, as is shown in documentation)
    def makeRequest = Task.effect {
      if (new Random().nextInt(10) > 7) true else throw new Exception("Not true")
    }
  }

你的问题是你在 effect 上使用 retry 而不是你想要的 repeat 因为你将明确绕过你提到的错误通道。

所以只需将 makeRequest.retry(schedule) 更改为 makeRequest.repeat(schedule) 就可以了。

有关更详细的说明,请考虑以下签名:

// Schedule.spaced
def spaced(duration: Duration): Schedule[Any, Any, Long]

// Schedule.recurs
def recurs(n: Int): Schedule[Any, Any, Long]

// Schedule.recurUntilEquals
def recurUntilEquals[A](a: => A): Schedule[Any, A, A]

Schedule有三个类型参数,-Env-In+OutEnv和标准的R一样属于 ZIO 的类型,但 InOut 不同于其他 ZIO 类型的标准 EA。这是因为根据文档,Schedule 描述了“循环计划,它消耗类型 In 的值,以及 returns 类型 Out 的值”。对于 spacedrecurs 输入是 Any 表示它将接受任何输入值并且通过扩展也不限制该值。你可以通过将两者组合在一起来看到这一点:

val s: Schedule[Any, Any, (Long, Long)] = Schedule.spaced(1.second) && Schedule.recurs(1)

这也是为什么它作为 retry 的一部分使用时不会导致任何编译器错误的原因,因为它们在不使用它时对错误通道没有任何特定要求. 但是 这也隐藏了您的问题,因为 retry 仅在出现 错误时才使用时间表 但是因为您要返回 truefalse 您最终没有收到错误,并且您的计划从未被调用。

添加 recurUntilEquals 后,将向计划添加一个输入约束:

val s: Schedule[Any, Boolean, ((Long, Long), Boolean)] = Schedule.spaced(1.second) && Schedule.recurs(1) && Schedule.recurUntilEquals(true)

现在你说应该输入 Schedule 的输入实际上是一个布尔值,但 retry 具有签名:

def retry[R1 <: R, S](policy: Schedule[R1, E, S])(implicit ev: CanFail[E]): ZIO[R1 with Clock, E, A]

注意 policy 参数中 Schedule 的第二个位置是 E 参数,它是错误类型,因为 Throwable =!= Boolean 结果出现编译器错误。

对应的是repeat

的签名
def repeat[R1 <: R, B](schedule: Schedule[R1, A, B]): ZIO[R1 with Clock, E, B]

这里我们看到 schedule 实际上采用 A 类型,在这种情况下,这将是您的 API 或 Boolean 的响应,与您的匹配期待您提供的时间表。

我使用的是 ZIO.repeatWhile(task)(condition),它非常适合我的情况。