Akka 流图恢复问题
Akka Stream Graph recover issue
我创建了一个图表来并行化具有相同输入的两个流程。流产生 Future[Option[Entity]]。如果 flowA 失败,我想 return 一个 Future[None] 但 recover 似乎没有被调用
val graph: Flow[Input, (Future[Option[Entity]], Future[Option[Entity]]), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Input](2))
val zip = builder.add(Zip[Future[Option[Entity]], Future[Option[Entity]]])
val flowAwithRecovery = flowA.recover{ case t: Throwable =>
logger.error(t, "Error retrieving output from flowA. Resuming without them.")
Future.successful(None)
}
broadcast.out(0) ~> flowAwithRecovery ~> zip.in0
broadcast.out(1) ~> flowB ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
当我 运行 图表和流程 A return 是一个失败的未来时,恢复不会被执行。作为一种解决方法,我在处理结束时恢复了 Future,但我想在设计图形时加入这种逻辑。
当从上游传播异常时,recover
组合器开始发挥作用。
Future.failed
不是异常,而是有效元素。
你需要像
这样的东西
flowA.map(_.recover{ case t: Throwable =>
logger.error(t, "Error retrieving output from flowA. Resuming without them.")
None
})
换句话说,您真的需要在流中传递 Future
s 吗?您最好在构建 flowA
和 flowB
时使用 mapAsync
并让它们只生成 Option[Entity]
.
我创建了一个图表来并行化具有相同输入的两个流程。流产生 Future[Option[Entity]]。如果 flowA 失败,我想 return 一个 Future[None] 但 recover 似乎没有被调用
val graph: Flow[Input, (Future[Option[Entity]], Future[Option[Entity]]), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Input](2))
val zip = builder.add(Zip[Future[Option[Entity]], Future[Option[Entity]]])
val flowAwithRecovery = flowA.recover{ case t: Throwable =>
logger.error(t, "Error retrieving output from flowA. Resuming without them.")
Future.successful(None)
}
broadcast.out(0) ~> flowAwithRecovery ~> zip.in0
broadcast.out(1) ~> flowB ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
当我 运行 图表和流程 A return 是一个失败的未来时,恢复不会被执行。作为一种解决方法,我在处理结束时恢复了 Future,但我想在设计图形时加入这种逻辑。
当从上游传播异常时,recover
组合器开始发挥作用。
Future.failed
不是异常,而是有效元素。
你需要像
flowA.map(_.recover{ case t: Throwable =>
logger.error(t, "Error retrieving output from flowA. Resuming without them.")
None
})
换句话说,您真的需要在流中传递 Future
s 吗?您最好在构建 flowA
和 flowB
时使用 mapAsync
并让它们只生成 Option[Entity]
.