监控 akka 流的生命周期

monitoring the lifecyle of an akka stream

我想监控 akka-stream 的生命周期,看起来 monitor 会做我需要的,但我的监控函数是异步的,返回一个 Future,所以我需要监视器也是异步的。

monitor 具有以下签名:

def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2]

但我需要这样的东西:

def monitorAsync[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Future[Mat2]): ReprMat[Out, Mat2]

有没有办法使用像 mapAsync.

这样的 akka-streams 原语来实现这个

我想我可以使用 mapAsync + watchTermination,但当 monitor 几乎可以满足我的需要时,这似乎是一个复杂的解决方案。

事实证明 monitor 根本不是我想要的,因为 a 只能在流实现后才能访问 FlowMonitor

我最终用 mapAsyncrecover 实现了这个。我在这里进行了简化,但是是这样的:

val monitor = new Monitor {
  def onNext: Future[Unit] = ???
  def onFailure(cause: Throwable): Future[Unit] = ???
  def onFinish: Future[Unit] = ???
}

source.mapAsync { v => 
  monitor.onNext.map(_ => v)
}.watchTermination() { (mat, doneF) =>
  doneF.flatMap(_ => monitor.onFinish).recoverWith( case ex => monitor.onFailure(ex))
}