自定义处理 Akka Streams Flow 中的 Future 失败
Custom handling Future failures in Akka Streams Flow
我尝试在 Akka Streams 中连接多个 Flow,并根据 Flow 以不同的方式处理它们的错误。它可以这样使用某物来完成:
Flow[String, Either[ProcessingError, String], NotUsed]
然后根据任一值将响应转移到错误处理程序。
我的问题是,一些 Flows return Future[String] 而不是 String 我不知道如何评估它以便能够在每个 Flow 之后捕获错误并以自定义方式处理它。
要将 Future
转换为 Either
而不会导致流失败,您可以使用
.mapAsync(1){ e =>
val f: Future[T] = ...
f.transformWith(_.toEither)
}
mapAsync
和 mapAsyncUnordered
是在 Akka Streams 中评估期货的惯用方法。请注意,那些失败的未来将使流失败,要处理流中的错误,您需要对未来作出反应 "immediately" 以将其转换为 Try
或 Either
.
我尝试在 Akka Streams 中连接多个 Flow,并根据 Flow 以不同的方式处理它们的错误。它可以这样使用某物来完成:
Flow[String, Either[ProcessingError, String], NotUsed]
然后根据任一值将响应转移到错误处理程序。
我的问题是,一些 Flows return Future[String] 而不是 String 我不知道如何评估它以便能够在每个 Flow 之后捕获错误并以自定义方式处理它。
要将 Future
转换为 Either
而不会导致流失败,您可以使用
.mapAsync(1){ e =>
val f: Future[T] = ...
f.transformWith(_.toEither)
}
mapAsync
和 mapAsyncUnordered
是在 Akka Streams 中评估期货的惯用方法。请注意,那些失败的未来将使流失败,要处理流中的错误,您需要对未来作出反应 "immediately" 以将其转换为 Try
或 Either
.