如何在 FlowShape 中获取连接的入口和出口的实例?
How do I get instances of connected Inlet and Outlet in FlowShape?
如何在 FlowShape
中获取连接的 Inlet
和 Outlet
的实例?考虑以下示例
def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val ticker = Source.tick(rate, rate, Unit)
val zip = builder.add(Zip[T, Unit.type])
val map = Flow[(T, Unit.type)].map { case (value, _) => value }
val messageExtractor = builder.add(map)
val in = Inlet[T]("Req.in")
val out = Outlet[T]("Req.out")
out ~> zip.in0
ticker ~> zip.in1
zip.out ~> messageExtractor.in
FlowShape.of(in, messageExtractor.out)
})
当我在 Source.via()
中使用它时,出现以下异常
Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph.
at scala.Predef$.require(Predef.scala:219)
at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204)
我错过了什么?
in
入口和 out
出口没有连接到任何东西。这就是为什么会出现异常(不幸的是这种问题只能在运行时检测到)
你想要一个流,其中唯一打开的入口是 zip 入口之一(zip.in0
,因为 zip.in1
连接到自动收报机),唯一打开的出口是messageExtractor,那么这个怎么样:
def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val ticker = Source.tick(rate, rate, ())
val zip = builder.add(Zip[T, Unit])
val map = Flow[(T, Unit)].map { case (value, _) => value }
val messageExtractor = builder.add(map)
ticker ~> zip.in1
zip.out ~> messageExtractor.in
FlowShape.of(zip.in0, messageExtractor.out)
})
如何在 FlowShape
中获取连接的 Inlet
和 Outlet
的实例?考虑以下示例
def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val ticker = Source.tick(rate, rate, Unit)
val zip = builder.add(Zip[T, Unit.type])
val map = Flow[(T, Unit.type)].map { case (value, _) => value }
val messageExtractor = builder.add(map)
val in = Inlet[T]("Req.in")
val out = Outlet[T]("Req.out")
out ~> zip.in0
ticker ~> zip.in1
zip.out ~> messageExtractor.in
FlowShape.of(in, messageExtractor.out)
})
当我在 Source.via()
中使用它时,出现以下异常
Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph.
at scala.Predef$.require(Predef.scala:219)
at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204)
我错过了什么?
in
入口和 out
出口没有连接到任何东西。这就是为什么会出现异常(不幸的是这种问题只能在运行时检测到)
你想要一个流,其中唯一打开的入口是 zip 入口之一(zip.in0
,因为 zip.in1
连接到自动收报机),唯一打开的出口是messageExtractor,那么这个怎么样:
def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val ticker = Source.tick(rate, rate, ())
val zip = builder.add(Zip[T, Unit])
val map = Flow[(T, Unit)].map { case (value, _) => value }
val messageExtractor = builder.add(map)
ticker ~> zip.in1
zip.out ~> messageExtractor.in
FlowShape.of(zip.in0, messageExtractor.out)
})