如何从 Akka Stream Graph 中获取物化结果?

How to get materialised result from Akka Stream Graph?

我正在尝试弄清楚如何从 scala Akka Stream 图表中获取物化结果。

我正在使用 "com.typesafe.akka" %% "akka-stream-experimental" % "1.0"

我查看了 docs 但找不到任何示例。

所以,假设我有一个代码

val g = FlowGraph.closed() { implicit builder=>
  import FlowGraph.Implicits._

  val in = Source.apply(1 until 10)
  val plus = Flow[Int].map(_ + 10)
  val out = Sink.fold[Seq[Int], Int](Nil){
    case (acc, num) => if (num % 2 == 0) acc :+ num else acc
  }

  in ~> plus ~> out
}

val result = g.run()

我想从图中得到结果 g 但它是 returns 单位。怎么处理?

谢谢。

编辑:您可能还想看看您是如何创建流程的……我认为 RunnableGraph 可能更适合您的情况,然后 run 会 return Mat 类型。

否则,请尝试查看手册本节中的 .mapMaterializedValue(v => ...) 调用(特别是第 40 行):

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-flows-and-basics.html#Combining_materialized_values

我想这就是你想要做的。

编辑:它也用在我发现的这个聊天应用程序中(第 54 行):

https://github.com/jrudolph/akka-http-scala-js-websocket-chat/blob/master/backend/src/main/scala/example/akkawschat/Chat.scala

看来你无法从 FlowGraph 做到这一点。您需要在外部创建所有流程,然后在 FlowGraph 内部使用它们。

val in = Source.apply(1 until 10)
val plus = Flow[Int].map(_ + 10)
val out = Sink.fold[Seq[Int], Int](Nil) {
  case (acc, num) => if (num % 2 == 0) acc :+ num else acc
}

所以,代码看起来像

val g = FlowGraph.closed(in, plus, out)((_, _, _)) { implicit builder => (src, f, dst) =>
  import FlowGraph.Implicits._
  src ~> f ~> dst
}

另一种我发现更简单的方法是

val z = in.via(plus).toMat(out)(Keep.right)