自定义处理 Akka Streams Flow 中的 Future 失败

Custom handling Future failures in Akka Streams Flow

我尝试在 Akka Streams 中连接多个 Flow,并根据 Flow 以不同的方式处理它们的错误。它可以这样使用某物来完成:

Flow[String, Either[ProcessingError, String], NotUsed]

然后根据任一值将响应转移到错误处理程序。

我的问题是,一些 Flows return Future[String] 而不是 String 我不知道如何评估它以便能够在每个 Flow 之后捕获错误并以自定义方式处理它。

要将 Future 转换为 Either 而不会导致流失败,您可以使用

.mapAsync(1){ e => 
  val f: Future[T] = ...
  f.transformWith(_.toEither)
}

mapAsyncmapAsyncUnordered 是在 Akka Streams 中评估期货的惯用方法。请注意,那些失败的未来将使流失败,要处理流中的错误,您需要对未来作出反应 "immediately" 以将其转换为 TryEither.