将流的输出提供给 Akka Streams Graph 中的广播

Feeding the output of a Flow to a Broadcast in Akka Streams Graph

我正在尝试编写 Akka 流图。我写的代码是

val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
   (sink1, sink2) =>
      import GraphDSL.Implicits._
      val bcast = builder.add(Broadcast[Row](2))
      val flow = source ~> flow1 ~> flow2
      flow.out ~> bcast.in
      bcast.out(0) ~> sink1
      bcast.out(1) ~> flow3 ~> flow4 ~> sink2
      ClosedShape
})

val (f1, f2) = graph.run()
val consolidated = Future.sequence(List(f1, f2))
Await.result(consolidated, Duration.Inf)

此代码无法编译,因为我无法将输出流连接到 bcast 的输入。

我可以将源的输出连接到 bcast 的输入,但我不能这样做,因为某些部分在两个分支之间是通用的。所以我必须在 flow2

之后才在图中创建分支

另外...我不确定我是否正确地编写了图表,因为它返回了 Done 的两个 futures,我需要使用 Sequence 手动将它们组合成一个 future。

您不能分两步连接图表,因为 ~> 组合器不会返回流。它实际上是一个有状态的、声明性的操作。

此处更好的方法是一次性连接图表,例如

  source ~> flow1 ~> flow2 ~> bcast
                              bcast          ~>          sink1
                              bcast ~> flow3 ~> flow4 ~> sink2

或者,您也可以通过向构建器添加阶段(并检索其形状)来拆分声明,例如

  val flow2s = builder.add(flow2)

  source ~> flow1 ~> flow2s.in
  flow2s.out ~> bcast
                bcast          ~>          sink1
                bcast ~> flow3 ~> flow4 ~> sink2

关于具体化的 Futures,您需要选择什么是有意义的作为整个图形的具体化值。如果您只需要 2 个 Sink 物化 Future 之一,则只需将那个传递给 GraphDSL.create 方法。 否则,如果您对两个 Future 都感兴趣,那么将它们 sequencezip 放在一起非常有意义。