Akka Streams:使用 KillSwitch 关闭后创建另一个 RunnableGraph
Akka Streams: Creating another RunnableGraph after shutdown with KillSwitch
我尝试关闭转换一些数字的流并创建同一图形蓝图的另一个流。但是,第二个流实例不会 运行 或者至少不会向控制台打印任何内容。
我错过了什么?
object KillSwitchSample extends App {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
val killSwitch = KillSwitches.shared("switch")
val stream1 = createStream("stream 1")
stream1.run()
Thread.sleep(200)
killSwitch.shutdown()
val stream2 = createStream("stream 2")
stream2.run()
Thread.sleep(200)
killSwitch.shutdown()
def createStream(streamName: String): RunnableGraph[NotUsed] = {
Source.fromGraph(new NumbersSource)
.via(killSwitch.flow)
.map(el => s"$streamName: $el")
.to(Sink.foreach(println))
}
}
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var counter = 1
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, counter)
counter += 1
}
})
}
}
您使用共享 KillSwitch
。共享 KillSwitch
只能切换一次,之后它会记住它已经被切换,因此也会立即终止终止后续流。
这就是您的代码所发生的情况。您在第二次 运行 图形之前触发了终止开关。
您可以使用 KillSwitches.single
代替每次获得新的 KillSwitch
:
def createStream(streamName: String): RunnableGraph[UniqueKillSwitch] =
Source.fromGraph(new NumbersSource)
.map(el => s"$streamName: $el")
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.foreach(println))
val switch1 = createStream("a").run()
// ...
switch1.shutdown()
val switch2 = createStream("b").run()
// ...
switch2.shutdown()
我尝试关闭转换一些数字的流并创建同一图形蓝图的另一个流。但是,第二个流实例不会 运行 或者至少不会向控制台打印任何内容。
我错过了什么?
object KillSwitchSample extends App {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
val killSwitch = KillSwitches.shared("switch")
val stream1 = createStream("stream 1")
stream1.run()
Thread.sleep(200)
killSwitch.shutdown()
val stream2 = createStream("stream 2")
stream2.run()
Thread.sleep(200)
killSwitch.shutdown()
def createStream(streamName: String): RunnableGraph[NotUsed] = {
Source.fromGraph(new NumbersSource)
.via(killSwitch.flow)
.map(el => s"$streamName: $el")
.to(Sink.foreach(println))
}
}
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var counter = 1
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, counter)
counter += 1
}
})
}
}
您使用共享 KillSwitch
。共享 KillSwitch
只能切换一次,之后它会记住它已经被切换,因此也会立即终止终止后续流。
这就是您的代码所发生的情况。您在第二次 运行 图形之前触发了终止开关。
您可以使用 KillSwitches.single
代替每次获得新的 KillSwitch
:
def createStream(streamName: String): RunnableGraph[UniqueKillSwitch] =
Source.fromGraph(new NumbersSource)
.map(el => s"$streamName: $el")
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.foreach(println))
val switch1 = createStream("a").run()
// ...
switch1.shutdown()
val switch2 = createStream("b").run()
// ...
switch2.shutdown()