监控闭图 Akka Stream
Monitoring a closed graph Akka Stream
如果我在Akka Stream中创建了一个RunningGraph
,我怎么知道(从外面)
- 当所有节点因完成而被取消时?
- 当所有节点因错误而停止时?
我认为没有办法对任意图执行此操作,但如果您控制了图,则只需将监控接收器附加到每个可能失败或完成的节点的输出(这些节点至少有一个输出),例如:
import akka.actor.Status
// obtain graph parts (this can be done inside the graph building as well)
val source: Source[Int, NotUsed] = ...
val flow: Flow[Int, String, NotUsed] = ...
val sink: Sink[String, NotUsed] = ...
// create monitoring actors
val aggregate = actorSystem.actorOf(Props[Aggregate])
val sourceMonitorActor = actorSystem.actorOf(Props(new Monitor("source", aggregate)))
val flowMonitorActor = actorSystem.actorOf(Props(new Monitor("flow", aggregate)))
// create the graph
val graph = GraphDSL.create() { implicit b =>
import GraphDSL._
val sourceMonitor = b.add(Sink.actorRef(sourceMonitorActor, Status.Success(()))),
val flowMonitor = b.add(Sink.actorRef(flowMonitorActor, Status.Success(())))
val bc1 = b.add(Broadcast[Int](2))
val bc2 = b.add(Broadcast[String](2))
// main flow
source ~> bc1 ~> flow ~> bc2 ~> sink
// monitoring branches
bc1 ~> sourceMonitor
bc2 ~> flowMonitor
ClosedShape
}
// run the graph
RunnableGraph.fromGraph(graph).run()
class Monitor(name: String, aggregate: ActorRef) extends Actor {
override def receive: Receive = {
case Status.Success(_) => aggregate ! s"$name completed successfully"
case Status.Failure(e) => aggregate ! s"$name completed with failure: ${e.getMessage}"
case _ =>
}
}
class Aggregate extends Actor {
override def receive: Receive = {
case s: String => println(s)
}
}
也可以只创建一个监控参与者并在所有监控接收器中使用它,但在这种情况下,您将无法轻松区分失败的流。
并且还有关于源和流的 watchTermination()
方法,它允许实现与流一起终止的未来。我认为它可能很难与 GraphDSL
一起使用,但对于常规流方法,它可能看起来像这样:
import akka.Done
import akka.actor.Status
import akka.pattern.pipe
val monitor = actorSystem.actorOf(Props[Monitor])
source
.watchTermination()((f, _) => f pipeTo monitor)
.via(flow).watchTermination((f, _) => f pipeTo monitor)
.to(sink)
.run()
class Monitor extends Actor {
override def receive: Receive = {
case Done => println("stream completed")
case Status.Failure(e) => println(s"stream failed: ${e.getMessage}")
}
}
您可以先转换 future,然后再将其值传递给 actor 以区分流。
如果我在Akka Stream中创建了一个RunningGraph
,我怎么知道(从外面)
- 当所有节点因完成而被取消时?
- 当所有节点因错误而停止时?
我认为没有办法对任意图执行此操作,但如果您控制了图,则只需将监控接收器附加到每个可能失败或完成的节点的输出(这些节点至少有一个输出),例如:
import akka.actor.Status
// obtain graph parts (this can be done inside the graph building as well)
val source: Source[Int, NotUsed] = ...
val flow: Flow[Int, String, NotUsed] = ...
val sink: Sink[String, NotUsed] = ...
// create monitoring actors
val aggregate = actorSystem.actorOf(Props[Aggregate])
val sourceMonitorActor = actorSystem.actorOf(Props(new Monitor("source", aggregate)))
val flowMonitorActor = actorSystem.actorOf(Props(new Monitor("flow", aggregate)))
// create the graph
val graph = GraphDSL.create() { implicit b =>
import GraphDSL._
val sourceMonitor = b.add(Sink.actorRef(sourceMonitorActor, Status.Success(()))),
val flowMonitor = b.add(Sink.actorRef(flowMonitorActor, Status.Success(())))
val bc1 = b.add(Broadcast[Int](2))
val bc2 = b.add(Broadcast[String](2))
// main flow
source ~> bc1 ~> flow ~> bc2 ~> sink
// monitoring branches
bc1 ~> sourceMonitor
bc2 ~> flowMonitor
ClosedShape
}
// run the graph
RunnableGraph.fromGraph(graph).run()
class Monitor(name: String, aggregate: ActorRef) extends Actor {
override def receive: Receive = {
case Status.Success(_) => aggregate ! s"$name completed successfully"
case Status.Failure(e) => aggregate ! s"$name completed with failure: ${e.getMessage}"
case _ =>
}
}
class Aggregate extends Actor {
override def receive: Receive = {
case s: String => println(s)
}
}
也可以只创建一个监控参与者并在所有监控接收器中使用它,但在这种情况下,您将无法轻松区分失败的流。
并且还有关于源和流的 watchTermination()
方法,它允许实现与流一起终止的未来。我认为它可能很难与 GraphDSL
一起使用,但对于常规流方法,它可能看起来像这样:
import akka.Done
import akka.actor.Status
import akka.pattern.pipe
val monitor = actorSystem.actorOf(Props[Monitor])
source
.watchTermination()((f, _) => f pipeTo monitor)
.via(flow).watchTermination((f, _) => f pipeTo monitor)
.to(sink)
.run()
class Monitor extends Actor {
override def receive: Receive = {
case Done => println("stream completed")
case Status.Failure(e) => println(s"stream failed: ${e.getMessage}")
}
}
您可以先转换 future,然后再将其值传递给 actor 以区分流。