Akka Streams:如何使用流为图形成入口和出口

Akka Streams: How to form inlets and outlets for a Graph using a Flow

我有一些类似于以下的代码:

object Test extends App {
  val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val input = builder.add(Balance[Int](1)) //Question 1) how to get rid of this input
      val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
      val balance = builder.add(Balance[Int](2))

      val flow1 = Flow[Int].map(_*2)
      val flow2 = Flow[Int].map(_*2)

      val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
        left + right
      }))

      val flow3 = Flow[Int].map(_*2)

      input ~> buffer ~> balance.in
      balance.out(0) ~> flow1 ~> zip.in0
      balance.out(1) ~> flow2 ~> zip.in1
      zip.out ~> flow3

      FlowShape(input.in, flow3) //Question 2) how to make an outlet here
    })
}

请注意,我必须添加一个名为 inputBalance,因为我无法从 FlowShape 的第一个 Buffer 中检索 Inlet I想要创造。还有其他更简单的方法可以解决这个问题吗?用 1 Outlet 创建 Balance 似乎是错误的方法。

我的第二个问题类似。我无法从 flow3 检索到 Outlet。我知道解决这个问题的唯一方法是创建另一个 Balance,并将其 Outlet 公开为整个 FlowShapeOutlet。有更好的方法解决这个问题吗?

一个Balance is a fan-out shape that emits to the first available output. Considering you are zipping the flows in the next step, what you need is a Broadcast。当所有输出都可用时,它将扇出到所有输出。

此外,建造者可以添加任何 Graph 的形状,包括 Flow。您不必为此使用自定义形状。

更新后的代码:

object Test extends App {
  val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
      val input = builder.add(buffer) 
      val broadcast = builder.add(Broadcast[Int](2))

      val flow1 = Flow[Int].map(_*2)
      val flow2 = Flow[Int].map(_*2)

      val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
        left + right
      }))

      val flow3 = builder.add(Flow[Int].map(_*2))

      input ~> broadcast.in
      broadcast.out(0) ~> flow1 ~> zip.in0
      broadcast.out(1) ~> flow2 ~> zip.in1
      zip.out ~> flow3.in

      FlowShape(input.in, flow3.out) 
    })
}