Scala 重试期货序列,直到它们全部完成

Scala retry sequence of futures until they all complete

在 Scala 中,您将如何编写一个函数来获取 Futures 序列,运行它们,不断重试任何失败的,然后 returns 结果?

例如,签名可能是:

  def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]

可配置超时的加分,此时函数失败,被调用方可以处理这种情况。
如果该错误案例处理程序可以收到失败的 Futures 列表,则可获得加分。

谢谢!

基于考虑

def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
  Future
    .unit
    .flatMap(_ => expr).map(v => Right(v))
    .recoverWith {
      case _ if n > 1 => retry(expr, n - 1)
      case e => Future.failed(e).recover{case e => Left(e)}
    }
}

结合

Future.sequence

List[Future[T]] 转换为 Future[List[T]]。但是 sequence 具有快速失败的行为,因此我们不得不将 Future[T] 提升到 Future[Either[Throwable, T]]

将这些部分放在一起我们可以定义

def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
  Future.sequence(futures.map(f => retry(f.apply())))
}

并像这样使用它

val futures = List(
  () => Future(42),
  () => Future(throw new RuntimeException("boom 1")),
  () => Future(11),
  () => Future(throw new RuntimeException("boom 2"))
)

waitRetryAll(futures)
  .andThen { case v => println(v) }

输出

Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))

我们可以 collect 我们的 LeftRight 并相应地恢复或继续处理,例如

waitRetryAll(futures)
  .map(_.collect{ case v if v.isLeft => v })
  ...

请注意我们如何必须传入 List[() => Future[T]] 而不是 List[Future[T]] 以防止期货急切启动。

据我所知,在标准库中没有 Future 超时的实用程序。

您将如何 interrupt/cancel 在 JVM 上进行持续计算?在一般情况下,你不能,你只能在 wait 上打断 Thread 但如果它永远不会 waits?用于异步计算的 IO 库(定义取消)将 IO 作为一系列较小的不可中断任务执行(每个 map/flatMap 创建一个新步骤),如果它们收到 cancel/timeout 则它们将继续执行当前任务(如他们无法阻止它)但他们不会开始下一个。您可以 return 超时异常,但仍然会执行最后一步,因此如果它是一些副作用(例如数据库操作),它将在您已经 return 失败后完成。

这是不直观且棘手的,我认为这是未将此行为添加到标准库的原因。

此外,future 正在进行中,可能会产生副作用。您不能获取 Future[A] 类型的值并重新运行它。但是,您可以将 future 作为 by-name 参数传递,以便在 .recoverWith 中您可以重新创建 future。

遗憾的是你可以实现类似 "retry until the LocalDateTime.now - startTime >= " 的东西,因为这就是我认为你想要的:

def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
  future.recoverWith {
    case error: Throwable =>
      if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
      else retryHelper(future, attemptsLeft - 1, timeoutTime)
  }

可以与 Future.sequence 结合创建结果列表:

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}

如果您想跟踪哪些 future 失败了,哪些 future 成功了:

def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
  future.map(a => Right(a))).recover {
    case error: Throwable => Left(error)
  }

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}

如果您不为在 JVM 上取消期货而烦恼,并且如果您有更多类似的情况,我建议您使用一个库。

如果你想使用为你实现重试的东西,cats-retry

如果你想在定义计算时有比 Future 更好的东西(例如,不需要你使用名称参数或无效函数的东西)试试 Monix or ZIO (https://zio.dev/)