使用带有两个接收器的源并获得一个接收器的结果

Consume a source with two sinks and get the result of one sink

我想使用带有两个不同水槽的 Source

简化示例:

val source = Source(1 to 20)

val addSink = Sink.fold[Int, Int](0)(_ + _)
val subtractSink = Sink.fold[Int, Int](0)(_ - _)

val graph = GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val bcast = builder.add(Broadcast[Int](2))

  source ~> bcast.in

  bcast.out(0) ~> addSink
  bcast.out(1) ~> subtrackSink

  ClosedShape
}

RunnableGraph.fromGraph(graph).run()

val result: Future[Int] = ???

我需要能够检索 addSink 的结果。 RunnableGraph.fromGraph(graph).run() 给我 NotUsed, 但我想得到一个 Int(第一次折叠的结果 Sink)。可能吗?

将两个接收器传递给图形构建器的 create 方法,这使您可以访问它们各自的物化值:

val graph = GraphDSL.create(addSink, subtractSink)((_, _)) { implicit builder =>
  (aSink, sSink) =>
  import GraphDSL.Implicits._

  val bcast = builder.add(Broadcast[Int](2))

  source ~> bcast.in
  bcast.out(0) ~> aSink
  bcast.out(1) ~> sSink
  ClosedShape
}

val (addResult, subtractResult): (Future[Int], Future[Int]) =
  RunnableGraph.fromGraph(graph).run() 

或者,您可以放弃图形 DSL 并使用 alsoToMat:

val result: Future[Int] =
  Source(1 to 20)
    .alsoToMat(addSink)(Keep.right)
    .toMat(subtractSink)(Keep.left)
    .run()

以上给出了addSink的物化值。如果你想获得 addSinksubtractSink 的物化值,使用 Keep.both:

val (addResult, subtractResult): (Future[Int], Future[Int]) =
  Source(1 to 20)
    .alsoToMat(addSink)(Keep.right)
    .toMat(subtractSink)(Keep.both) // <--
    .run()