为什么 Akka Streams 会吞噬我的异常?

Why is Akka Streams swallowing my exceptions?

为什么异常在

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

object TestExceptionHandling {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val materializer = ActorMaterializer()(defaultActorSystem)

    Source(List(1, 2, 3)).map { i =>
      if (i == 2) {
        throw new RuntimeException("Please, don't swallow me!")
      } else {
        i
      }
    }.runForeach { i =>
      println(s"Received $i")
    }
  }
}

默默无视?我可以看到流在打印 Received 1 后停止,但没有记录任何内容。请注意,问题通常不是日志记录配置,因为如果我在我的 application.conf 文件中设置 akka.log-config-on-start = on,我会看到很多输出。

我现在使用自定义 Supervision.Decider 确保异常被正确记录,可以像这样设置:

val decider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream", e)
  Supervision.Stop
}

implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

此外,正如 Vikor Klang 所指出的,在上面给出的示例中,异常也可以是 "caught" 通过

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.runForeach { i =>
  println(s"Received $i")
}.onComplete {
  case Success(_) =>
    println("Done")
  case Failure(e) =>
    println(s"Failed with $e")
}

但是请注意,这种方法对您没有帮助

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.to(Sink.foreach { i =>
  println(s"Received $i")
}).run()

因为 run() returns Unit.

我刚开始使用akk-streams的时候也有过类似的问题。 Supervision.Decider 有帮助,但并非总是如此。

不幸的是,它没有捕获 ActionPublisher 中抛出的异常。我看到它已处理,已调用 ActorPublisher.onError 但未到达 Supervision.Decider。它适用于文档中提供的简单 Stream。

如果我使用 Sink.actorRef,错误也不会到达 actor。

为了实验,我尝试了以下示例

val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))

在这种情况下,异常被 Decider 捕获但从未到达 actor 订阅者。

总的来说,我认为这是不一致的行为。我不能使用一种机制来处理 Stream 中的错误。

我原来的 SO 问题:Custom Supervision.Decider doesn't catch exception produced by ActorPublisher

这是跟踪它的 akka 问题:https://github.com/akka/akka/issues/18359

我遇到了一个不同的问题,Akka Streams 吞没了我的异常。我将 post 放在这里,因为这是最高 Google 结果。

在这种情况下,sourceSource[ByteString, Any]:

source.runWith(StreamConverters.fromOutputStream(() => outputStream))

这 return 是一个 Future[IOResult]。如果写入输出流失败(例如,源失败),那么 Future 仍然会 return 成功。在这种情况下,您实际上必须检查 IOResult 中的错误:

source.runWith(StreamConverters.fromOutputStream(() => output)).
      map(ior => {
        if (!ior.wasSuccessful) 
          throw new RuntimeException(ior.getError)
      })

这样做的结果将是一个失败的 Future,并带有正确的异常。