为什么akka-stream重启监督不重启而只是恢复
why akka-stream restart supervision doesn't restart but just resume
考虑这个简单的流:
Source(1 to 5)
.mapAsync(1) { i =>
if (i % 3 == 0) Future.failed(new Exception("I don't like 3"))
else Future.successful(i)
}
.withAttributes(
ActorAttributes.supervisionStrategy(Supervision.restartingDecider)
)
.runForeach(i => println(s"#$i"))
这实际上打印了
#1
#2
#4
这与恢复策略相同。
我希望流在未来失败后重新启动,输出如下
#1
#2
#1
#2
...
- 为什么 Resume 和 Restart 策略在这种情况下表现相同?
- 如何从头开始播放流?
问题 1:resume
和 restart
的区别在于 - 对于后者 - 失败阶段重新启动,丢失所有累积的内部状态. (参考 docs)。
在您的例子中,您有一个并行度为 1 的 mapAsync
阶段,因此您实际上永远不会有任何累积状态。这导致 resume
和 restart
在行为上是等效的。
问题2:Akka流中监督策略的语义与失败的特定阶段有关。一个失败的阶段根本无法重播过去流动的元素,因为它们已经消失了——即不在任何地方。任何监管策略都无法做到这一点。
您正在寻找的是 整个流 的重新启动,这应该可以通过 recoverWithRetries
组合器 (docs) 实现。您可以再次将相同的源 (Source(1 to 5)
) 提供给组合器,让它重播这些元素。
考虑这个简单的流:
Source(1 to 5)
.mapAsync(1) { i =>
if (i % 3 == 0) Future.failed(new Exception("I don't like 3"))
else Future.successful(i)
}
.withAttributes(
ActorAttributes.supervisionStrategy(Supervision.restartingDecider)
)
.runForeach(i => println(s"#$i"))
这实际上打印了
#1
#2
#4
这与恢复策略相同。 我希望流在未来失败后重新启动,输出如下
#1
#2
#1
#2
...
- 为什么 Resume 和 Restart 策略在这种情况下表现相同?
- 如何从头开始播放流?
问题 1:resume
和 restart
的区别在于 - 对于后者 - 失败阶段重新启动,丢失所有累积的内部状态. (参考 docs)。
在您的例子中,您有一个并行度为 1 的 mapAsync
阶段,因此您实际上永远不会有任何累积状态。这导致 resume
和 restart
在行为上是等效的。
问题2:Akka流中监督策略的语义与失败的特定阶段有关。一个失败的阶段根本无法重播过去流动的元素,因为它们已经消失了——即不在任何地方。任何监管策略都无法做到这一点。
您正在寻找的是 整个流 的重新启动,这应该可以通过 recoverWithRetries
组合器 (docs) 实现。您可以再次将相同的源 (Source(1 to 5)
) 提供给组合器,让它重播这些元素。