Akka Stream DSL 图 KillSwitch
Akka Stream DSL graph KillSwitch
我正在玩 Akka Streams 并使用 Alpakka 从文件流式传输内容。我需要在一段时间后停止流,所以我想使用 KillSwitc
h。但是我不知道怎么用,因为我用的是graph DSL
我的图表如下所示:
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
source ~> mainFlow ~> sink
ClosedShape
})
graph.run()
我在这里找到了解决方案:
但是,如果我使用的是图形 DSL,我不知道如何应用它。你能给我一些建议吗?
要在 GraphDSL 中显示具体化值,您可以将具体化为该值的阶段传递给 create
方法。用一个例子更容易解释。你的情况:
val switch = KillSwitches.single[Int]
val graph: RunnableGraph[UniqueKillSwitch] =
RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw =>
import GraphDSL.Implicits._
source ~> mainFlow ~> sw ~> sink
ClosedShape
})
val ks = graph.run()
ks.shutdown()
我正在玩 Akka Streams 并使用 Alpakka 从文件流式传输内容。我需要在一段时间后停止流,所以我想使用 KillSwitc
h。但是我不知道怎么用,因为我用的是graph DSL
我的图表如下所示:
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
source ~> mainFlow ~> sink
ClosedShape
})
graph.run()
我在这里找到了解决方案:
但是,如果我使用的是图形 DSL,我不知道如何应用它。你能给我一些建议吗?
要在 GraphDSL 中显示具体化值,您可以将具体化为该值的阶段传递给 create
方法。用一个例子更容易解释。你的情况:
val switch = KillSwitches.single[Int]
val graph: RunnableGraph[UniqueKillSwitch] =
RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw =>
import GraphDSL.Implicits._
source ~> mainFlow ~> sw ~> sink
ClosedShape
})
val ks = graph.run()
ks.shutdown()