监控 akka 流的生命周期
monitoring the lifecyle of an akka stream
我想监控 akka-stream 的生命周期,看起来 monitor 会做我需要的,但我的监控函数是异步的,返回一个 Future
,所以我需要监视器也是异步的。
monitor
具有以下签名:
def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2]
但我需要这样的东西:
def monitorAsync[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Future[Mat2]): ReprMat[Out, Mat2]
有没有办法使用像 mapAsync
.
这样的 akka-streams 原语来实现这个
我想我可以使用 mapAsync
+ watchTermination
,但当 monitor
几乎可以满足我的需要时,这似乎是一个复杂的解决方案。
事实证明 monitor
根本不是我想要的,因为 a 只能在流实现后才能访问 FlowMonitor
。
我最终用 mapAsync
和 recover
实现了这个。我在这里进行了简化,但是是这样的:
val monitor = new Monitor {
def onNext: Future[Unit] = ???
def onFailure(cause: Throwable): Future[Unit] = ???
def onFinish: Future[Unit] = ???
}
source.mapAsync { v =>
monitor.onNext.map(_ => v)
}.watchTermination() { (mat, doneF) =>
doneF.flatMap(_ => monitor.onFinish).recoverWith( case ex => monitor.onFailure(ex))
}
我想监控 akka-stream 的生命周期,看起来 monitor 会做我需要的,但我的监控函数是异步的,返回一个 Future
,所以我需要监视器也是异步的。
monitor
具有以下签名:
def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2]
但我需要这样的东西:
def monitorAsync[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Future[Mat2]): ReprMat[Out, Mat2]
有没有办法使用像 mapAsync
.
我想我可以使用 mapAsync
+ watchTermination
,但当 monitor
几乎可以满足我的需要时,这似乎是一个复杂的解决方案。
事实证明 monitor
根本不是我想要的,因为 a 只能在流实现后才能访问 FlowMonitor
。
我最终用 mapAsync
和 recover
实现了这个。我在这里进行了简化,但是是这样的:
val monitor = new Monitor {
def onNext: Future[Unit] = ???
def onFailure(cause: Throwable): Future[Unit] = ???
def onFinish: Future[Unit] = ???
}
source.mapAsync { v =>
monitor.onNext.map(_ => v)
}.watchTermination() { (mat, doneF) =>
doneF.flatMap(_ => monitor.onFinish).recoverWith( case ex => monitor.onFailure(ex))
}