Akka 流:如何仅在失败时使用退避策略?
Akka streams: how to use backoff strategy only in case of failure?
我有一个相当复杂的 Source source
,我已将其包装到 Backoff 中以便在出现故障时能够重新启动流:
val backoffSource = RestartSource.withBackoff(settings){() => source}
我link这个源到汇,它有一个物化值Future[Done]
:
val future = backoffSource.toMat(sink)(Keep.right).run()
之后,如果此图成功,我想在图完成后执行一些操作。所以,我想做一些事情:
future.onComplete{
case Failure(ex) => println(ex)
case Success(_) => println("Success")
}
但是根据文档 RestartSource
是
starting an operator again when it fails or completes, each time with a growing time delay between restarts
所以我的图表将不断重新启动,直到达到 maxRestarts
。
如何实现下一个行为:
- 只有在失败的情况下才会应用退避策略
- 如果图形成功它将调用我的
onComplete
方法 Success(_)
- 如果
maxRestarts
失败,将调用 onComplete
和 Failure(_)
?
使用RestartSource.onFailuresWithBackoff:
Wrap the given Source with a Source that will restart it when it fails using an exponential backoff.
从对源的检查来看,如果包装的 Source
失败到足以导致整体失败,则流将失败,但最后一次失败除外。
我有一个相当复杂的 Source source
,我已将其包装到 Backoff 中以便在出现故障时能够重新启动流:
val backoffSource = RestartSource.withBackoff(settings){() => source}
我link这个源到汇,它有一个物化值Future[Done]
:
val future = backoffSource.toMat(sink)(Keep.right).run()
之后,如果此图成功,我想在图完成后执行一些操作。所以,我想做一些事情:
future.onComplete{
case Failure(ex) => println(ex)
case Success(_) => println("Success")
}
但是根据文档 RestartSource
是
starting an operator again when it fails or completes, each time with a growing time delay between restarts
所以我的图表将不断重新启动,直到达到 maxRestarts
。
如何实现下一个行为:
- 只有在失败的情况下才会应用退避策略
- 如果图形成功它将调用我的
onComplete
方法Success(_)
- 如果
maxRestarts
失败,将调用onComplete
和Failure(_)
?
使用RestartSource.onFailuresWithBackoff:
Wrap the given Source with a Source that will restart it when it fails using an exponential backoff.
从对源的检查来看,如果包装的 Source
失败到足以导致整体失败,则流将失败,但最后一次失败除外。