将流的输出提供给 Akka Streams Graph 中的广播
Feeding the output of a Flow to a Broadcast in Akka Streams Graph
我正在尝试编写 Akka 流图。我写的代码是
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
(sink1, sink2) =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[Row](2))
val flow = source ~> flow1 ~> flow2
flow.out ~> bcast.in
bcast.out(0) ~> sink1
bcast.out(1) ~> flow3 ~> flow4 ~> sink2
ClosedShape
})
val (f1, f2) = graph.run()
val consolidated = Future.sequence(List(f1, f2))
Await.result(consolidated, Duration.Inf)
此代码无法编译,因为我无法将输出流连接到 bcast 的输入。
我可以将源的输出连接到 bcast 的输入,但我不能这样做,因为某些部分在两个分支之间是通用的。所以我必须在 flow2
之后才在图中创建分支
另外...我不确定我是否正确地编写了图表,因为它返回了 Done 的两个 futures,我需要使用 Sequence 手动将它们组合成一个 future。
您不能分两步连接图表,因为 ~>
组合器不会返回流。它实际上是一个有状态的、声明性的操作。
此处更好的方法是一次性连接图表,例如
source ~> flow1 ~> flow2 ~> bcast
bcast ~> sink1
bcast ~> flow3 ~> flow4 ~> sink2
或者,您也可以通过向构建器添加阶段(并检索其形状)来拆分声明,例如
val flow2s = builder.add(flow2)
source ~> flow1 ~> flow2s.in
flow2s.out ~> bcast
bcast ~> sink1
bcast ~> flow3 ~> flow4 ~> sink2
关于具体化的 Future
s,您需要选择什么是有意义的作为整个图形的具体化值。如果您只需要 2 个 Sink
物化 Future
之一,则只需将那个传递给 GraphDSL.create
方法。
否则,如果您对两个 Future
都感兴趣,那么将它们 sequence
或 zip
放在一起非常有意义。
我正在尝试编写 Akka 流图。我写的代码是
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
(sink1, sink2) =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[Row](2))
val flow = source ~> flow1 ~> flow2
flow.out ~> bcast.in
bcast.out(0) ~> sink1
bcast.out(1) ~> flow3 ~> flow4 ~> sink2
ClosedShape
})
val (f1, f2) = graph.run()
val consolidated = Future.sequence(List(f1, f2))
Await.result(consolidated, Duration.Inf)
此代码无法编译,因为我无法将输出流连接到 bcast 的输入。
我可以将源的输出连接到 bcast 的输入,但我不能这样做,因为某些部分在两个分支之间是通用的。所以我必须在 flow2
之后才在图中创建分支另外...我不确定我是否正确地编写了图表,因为它返回了 Done 的两个 futures,我需要使用 Sequence 手动将它们组合成一个 future。
您不能分两步连接图表,因为 ~>
组合器不会返回流。它实际上是一个有状态的、声明性的操作。
此处更好的方法是一次性连接图表,例如
source ~> flow1 ~> flow2 ~> bcast
bcast ~> sink1
bcast ~> flow3 ~> flow4 ~> sink2
或者,您也可以通过向构建器添加阶段(并检索其形状)来拆分声明,例如
val flow2s = builder.add(flow2)
source ~> flow1 ~> flow2s.in
flow2s.out ~> bcast
bcast ~> sink1
bcast ~> flow3 ~> flow4 ~> sink2
关于具体化的 Future
s,您需要选择什么是有意义的作为整个图形的具体化值。如果您只需要 2 个 Sink
物化 Future
之一,则只需将那个传递给 GraphDSL.create
方法。
否则,如果您对两个 Future
都感兴趣,那么将它们 sequence
或 zip
放在一起非常有意义。