为什么 Akka Streams 会吞噬我的异常?
Why is Akka Streams swallowing my exceptions?
为什么异常在
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
object TestExceptionHandling {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()(defaultActorSystem)
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.runForeach { i =>
println(s"Received $i")
}
}
}
默默无视?我可以看到流在打印 Received 1
后停止,但没有记录任何内容。请注意,问题通常不是日志记录配置,因为如果我在我的 application.conf
文件中设置 akka.log-config-on-start = on
,我会看到很多输出。
我现在使用自定义 Supervision.Decider
确保异常被正确记录,可以像这样设置:
val decider: Supervision.Decider = { e =>
logger.error("Unhandled exception in stream", e)
Supervision.Stop
}
implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
此外,正如 Vikor Klang 所指出的,在上面给出的示例中,异常也可以是 "caught" 通过
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.runForeach { i =>
println(s"Received $i")
}.onComplete {
case Success(_) =>
println("Done")
case Failure(e) =>
println(s"Failed with $e")
}
但是请注意,这种方法对您没有帮助
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.to(Sink.foreach { i =>
println(s"Received $i")
}).run()
因为 run()
returns Unit
.
我刚开始使用akk-streams的时候也有过类似的问题。 Supervision.Decider
有帮助,但并非总是如此。
不幸的是,它没有捕获 ActionPublisher
中抛出的异常。我看到它已处理,已调用 ActorPublisher.onError
但未到达 Supervision.Decider
。它适用于文档中提供的简单 Stream。
如果我使用 Sink.actorRef
,错误也不会到达 actor。
为了实验,我尝试了以下示例
val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))
在这种情况下,异常被 Decider 捕获但从未到达 actor 订阅者。
总的来说,我认为这是不一致的行为。我不能使用一种机制来处理 Stream 中的错误。
我原来的 SO 问题:Custom Supervision.Decider doesn't catch exception produced by ActorPublisher
这是跟踪它的 akka 问题:https://github.com/akka/akka/issues/18359
我遇到了一个不同的问题,Akka Streams 吞没了我的异常。我将 post 放在这里,因为这是最高 Google 结果。
在这种情况下,source
是 Source[ByteString, Any]
:
source.runWith(StreamConverters.fromOutputStream(() => outputStream))
这 return 是一个 Future[IOResult]。如果写入输出流失败(例如,源失败),那么 Future 仍然会 return 成功。在这种情况下,您实际上必须检查 IOResult 中的错误:
source.runWith(StreamConverters.fromOutputStream(() => output)).
map(ior => {
if (!ior.wasSuccessful)
throw new RuntimeException(ior.getError)
})
这样做的结果将是一个失败的 Future,并带有正确的异常。
为什么异常在
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
object TestExceptionHandling {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()(defaultActorSystem)
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.runForeach { i =>
println(s"Received $i")
}
}
}
默默无视?我可以看到流在打印 Received 1
后停止,但没有记录任何内容。请注意,问题通常不是日志记录配置,因为如果我在我的 application.conf
文件中设置 akka.log-config-on-start = on
,我会看到很多输出。
我现在使用自定义 Supervision.Decider
确保异常被正确记录,可以像这样设置:
val decider: Supervision.Decider = { e =>
logger.error("Unhandled exception in stream", e)
Supervision.Stop
}
implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
此外,正如 Vikor Klang 所指出的,在上面给出的示例中,异常也可以是 "caught" 通过
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.runForeach { i =>
println(s"Received $i")
}.onComplete {
case Success(_) =>
println("Done")
case Failure(e) =>
println(s"Failed with $e")
}
但是请注意,这种方法对您没有帮助
Source(List(1, 2, 3)).map { i =>
if (i == 2) {
throw new RuntimeException("Please, don't swallow me!")
} else {
i
}
}.to(Sink.foreach { i =>
println(s"Received $i")
}).run()
因为 run()
returns Unit
.
我刚开始使用akk-streams的时候也有过类似的问题。 Supervision.Decider
有帮助,但并非总是如此。
不幸的是,它没有捕获 ActionPublisher
中抛出的异常。我看到它已处理,已调用 ActorPublisher.onError
但未到达 Supervision.Decider
。它适用于文档中提供的简单 Stream。
如果我使用 Sink.actorRef
,错误也不会到达 actor。
为了实验,我尝试了以下示例
val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))
在这种情况下,异常被 Decider 捕获但从未到达 actor 订阅者。
总的来说,我认为这是不一致的行为。我不能使用一种机制来处理 Stream 中的错误。
我原来的 SO 问题:Custom Supervision.Decider doesn't catch exception produced by ActorPublisher
这是跟踪它的 akka 问题:https://github.com/akka/akka/issues/18359
我遇到了一个不同的问题,Akka Streams 吞没了我的异常。我将 post 放在这里,因为这是最高 Google 结果。
在这种情况下,source
是 Source[ByteString, Any]
:
source.runWith(StreamConverters.fromOutputStream(() => outputStream))
这 return 是一个 Future[IOResult]。如果写入输出流失败(例如,源失败),那么 Future 仍然会 return 成功。在这种情况下,您实际上必须检查 IOResult 中的错误:
source.runWith(StreamConverters.fromOutputStream(() => output)).
map(ior => {
if (!ior.wasSuccessful)
throw new RuntimeException(ior.getError)
})
这样做的结果将是一个失败的 Future,并带有正确的异常。