Akka Streams:如何使用流为图形成入口和出口
Akka Streams: How to form inlets and outlets for a Graph using a Flow
我有一些类似于以下的代码:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val input = builder.add(Balance[Int](1)) //Question 1) how to get rid of this input
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val balance = builder.add(Balance[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left + right
}))
val flow3 = Flow[Int].map(_*2)
input ~> buffer ~> balance.in
balance.out(0) ~> flow1 ~> zip.in0
balance.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3
FlowShape(input.in, flow3) //Question 2) how to make an outlet here
})
}
请注意,我必须添加一个名为 input
的 Balance
,因为我无法从 FlowShape
的第一个 Buffer
中检索 Inlet
I想要创造。还有其他更简单的方法可以解决这个问题吗?用 1 Outlet
创建 Balance
似乎是错误的方法。
我的第二个问题类似。我无法从 flow3
检索到 Outlet
。我知道解决这个问题的唯一方法是创建另一个 Balance
,并将其 Outlet
公开为整个 FlowShape
的 Outlet
。有更好的方法解决这个问题吗?
一个Balance
is a fan-out shape that emits to the first available output. Considering you are zipping the flows in the next step, what you need is a Broadcast
。当所有输出都可用时,它将扇出到所有输出。
此外,建造者可以添加任何 Graph
的形状,包括 Flow
。您不必为此使用自定义形状。
更新后的代码:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val input = builder.add(buffer)
val broadcast = builder.add(Broadcast[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left + right
}))
val flow3 = builder.add(Flow[Int].map(_*2))
input ~> broadcast.in
broadcast.out(0) ~> flow1 ~> zip.in0
broadcast.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3.in
FlowShape(input.in, flow3.out)
})
}
我有一些类似于以下的代码:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val input = builder.add(Balance[Int](1)) //Question 1) how to get rid of this input
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val balance = builder.add(Balance[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left + right
}))
val flow3 = Flow[Int].map(_*2)
input ~> buffer ~> balance.in
balance.out(0) ~> flow1 ~> zip.in0
balance.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3
FlowShape(input.in, flow3) //Question 2) how to make an outlet here
})
}
请注意,我必须添加一个名为 input
的 Balance
,因为我无法从 FlowShape
的第一个 Buffer
中检索 Inlet
I想要创造。还有其他更简单的方法可以解决这个问题吗?用 1 Outlet
创建 Balance
似乎是错误的方法。
我的第二个问题类似。我无法从 flow3
检索到 Outlet
。我知道解决这个问题的唯一方法是创建另一个 Balance
,并将其 Outlet
公开为整个 FlowShape
的 Outlet
。有更好的方法解决这个问题吗?
一个Balance
is a fan-out shape that emits to the first available output. Considering you are zipping the flows in the next step, what you need is a Broadcast
。当所有输出都可用时,它将扇出到所有输出。
此外,建造者可以添加任何 Graph
的形状,包括 Flow
。您不必为此使用自定义形状。
更新后的代码:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val input = builder.add(buffer)
val broadcast = builder.add(Broadcast[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left + right
}))
val flow3 = builder.add(Flow[Int].map(_*2))
input ~> broadcast.in
broadcast.out(0) ~> flow1 ~> zip.in0
broadcast.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3.in
FlowShape(input.in, flow3.out)
})
}