Scala 重试期货序列,直到它们全部完成
Scala retry sequence of futures until they all complete
在 Scala 中,您将如何编写一个函数来获取 Futures 序列,运行它们,不断重试任何失败的,然后 returns 结果?
例如,签名可能是:
def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]
可配置超时的加分,此时函数失败,被调用方可以处理这种情况。
如果该错误案例处理程序可以收到失败的 Futures 列表,则可获得加分。
谢谢!
基于考虑
def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
Future
.unit
.flatMap(_ => expr).map(v => Right(v))
.recoverWith {
case _ if n > 1 => retry(expr, n - 1)
case e => Future.failed(e).recover{case e => Left(e)}
}
}
结合
Future.sequence
将 List[Future[T]]
转换为 Future[List[T]]
。但是 sequence
具有快速失败的行为,因此我们不得不将 Future[T]
提升到 Future[Either[Throwable, T]]
将这些部分放在一起我们可以定义
def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
Future.sequence(futures.map(f => retry(f.apply())))
}
并像这样使用它
val futures = List(
() => Future(42),
() => Future(throw new RuntimeException("boom 1")),
() => Future(11),
() => Future(throw new RuntimeException("boom 2"))
)
waitRetryAll(futures)
.andThen { case v => println(v) }
输出
Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))
我们可以 collect
我们的 Left
或 Right
并相应地恢复或继续处理,例如
waitRetryAll(futures)
.map(_.collect{ case v if v.isLeft => v })
...
请注意我们如何必须传入 List[() => Future[T]]
而不是 List[Future[T]]
以防止期货急切启动。
据我所知,在标准库中没有 Future
超时的实用程序。
您将如何 interrupt/cancel 在 JVM 上进行持续计算?在一般情况下,你不能,你只能在 wait
上打断 Thread
但如果它永远不会 wait
s?用于异步计算的 IO 库(定义取消)将 IO 作为一系列较小的不可中断任务执行(每个 map/flatMap 创建一个新步骤),如果它们收到 cancel/timeout 则它们将继续执行当前任务(如他们无法阻止它)但他们不会开始下一个。您可以 return 超时异常,但仍然会执行最后一步,因此如果它是一些副作用(例如数据库操作),它将在您已经 return 失败后完成。
这是不直观且棘手的,我认为这是未将此行为添加到标准库的原因。
此外,future 正在进行中,可能会产生副作用。您不能获取 Future[A]
类型的值并重新运行它。但是,您可以将 future 作为 by-name 参数传递,以便在 .recoverWith
中您可以重新创建 future。
遗憾的是你可以实现类似 "retry until the LocalDateTime.now - startTime >= " 的东西,因为这就是我认为你想要的:
def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
future.recoverWith {
case error: Throwable =>
if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
else retryHelper(future, attemptsLeft - 1, timeoutTime)
}
可以与 Future.sequence
结合创建结果列表:
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}
如果您想跟踪哪些 future 失败了,哪些 future 成功了:
def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
future.map(a => Right(a))).recover {
case error: Throwable => Left(error)
}
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}
如果您不为在 JVM 上取消期货而烦恼,并且如果您有更多类似的情况,我建议您使用一个库。
如果你想使用为你实现重试的东西,cats-retry
如果你想在定义计算时有比 Future
更好的东西(例如,不需要你使用名称参数或无效函数的东西)试试 Monix or ZIO (https://zio.dev/)
在 Scala 中,您将如何编写一个函数来获取 Futures 序列,运行它们,不断重试任何失败的,然后 returns 结果?
例如,签名可能是:
def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]
可配置超时的加分,此时函数失败,被调用方可以处理这种情况。
如果该错误案例处理程序可以收到失败的 Futures 列表,则可获得加分。
谢谢!
基于
def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
Future
.unit
.flatMap(_ => expr).map(v => Right(v))
.recoverWith {
case _ if n > 1 => retry(expr, n - 1)
case e => Future.failed(e).recover{case e => Left(e)}
}
}
结合
Future.sequence
将 List[Future[T]]
转换为 Future[List[T]]
。但是 sequence
具有快速失败的行为,因此我们不得不将 Future[T]
提升到 Future[Either[Throwable, T]]
将这些部分放在一起我们可以定义
def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
Future.sequence(futures.map(f => retry(f.apply())))
}
并像这样使用它
val futures = List(
() => Future(42),
() => Future(throw new RuntimeException("boom 1")),
() => Future(11),
() => Future(throw new RuntimeException("boom 2"))
)
waitRetryAll(futures)
.andThen { case v => println(v) }
输出
Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))
我们可以 collect
我们的 Left
或 Right
并相应地恢复或继续处理,例如
waitRetryAll(futures)
.map(_.collect{ case v if v.isLeft => v })
...
请注意我们如何必须传入 List[() => Future[T]]
而不是 List[Future[T]]
以防止期货急切启动。
据我所知,在标准库中没有 Future
超时的实用程序。
您将如何 interrupt/cancel 在 JVM 上进行持续计算?在一般情况下,你不能,你只能在 wait
上打断 Thread
但如果它永远不会 wait
s?用于异步计算的 IO 库(定义取消)将 IO 作为一系列较小的不可中断任务执行(每个 map/flatMap 创建一个新步骤),如果它们收到 cancel/timeout 则它们将继续执行当前任务(如他们无法阻止它)但他们不会开始下一个。您可以 return 超时异常,但仍然会执行最后一步,因此如果它是一些副作用(例如数据库操作),它将在您已经 return 失败后完成。
这是不直观且棘手的,我认为这是未将此行为添加到标准库的原因。
此外,future 正在进行中,可能会产生副作用。您不能获取 Future[A]
类型的值并重新运行它。但是,您可以将 future 作为 by-name 参数传递,以便在 .recoverWith
中您可以重新创建 future。
遗憾的是你可以实现类似 "retry until the LocalDateTime.now - startTime >= " 的东西,因为这就是我认为你想要的:
def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
future.recoverWith {
case error: Throwable =>
if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
else retryHelper(future, attemptsLeft - 1, timeoutTime)
}
可以与 Future.sequence
结合创建结果列表:
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}
如果您想跟踪哪些 future 失败了,哪些 future 成功了:
def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
future.map(a => Right(a))).recover {
case error: Throwable => Left(error)
}
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}
如果您不为在 JVM 上取消期货而烦恼,并且如果您有更多类似的情况,我建议您使用一个库。
如果你想使用为你实现重试的东西,cats-retry
如果你想在定义计算时有比 Future
更好的东西(例如,不需要你使用名称参数或无效函数的东西)试试 Monix or ZIO (https://zio.dev/)