Akka 流:如何仅在失败时使用退避策略?

Akka streams: how to use backoff strategy only in case of failure?

我有一个相当复杂的 Source source,我已将其包装到 Backoff 中以便在出现故障时能够重新启动流:

val backoffSource = RestartSource.withBackoff(settings){() => source}

我link这个源到汇,它有一个物化值Future[Done]

val future = backoffSource.toMat(sink)(Keep.right).run()

之后,如果此图成功,我想在图完成后执行一些操作。所以,我想做一些事情:

future.onComplete{
  case Failure(ex) => println(ex)
  case Success(_) => println("Success")
}

但是根据文档 RestartSource

starting an operator again when it fails or completes, each time with a growing time delay between restarts

所以我的图表将不断重新启动,直到达到 maxRestarts

如何实现下一个行为:

  1. 只有在失败的情况下才会应用退避策略
  2. 如果图形成功它将调用我的 onComplete 方法 Success(_)
  3. 如果 maxRestarts 失败,将调用 onCompleteFailure(_)

使用RestartSource.onFailuresWithBackoff:

Wrap the given Source with a Source that will restart it when it fails using an exponential backoff.

从对源的检查来看,如果包装的 Source 失败到足以导致整体失败,则流将失败,但最后一次失败除外。