优雅地停止 Akka Stream
Gracefully stopping an Akka Stream
鉴于 akka 文档,我希望流在第 7/8 个元素之后停止。为什么它不停下来?它一直持续到最后一个元素(第 20 个)。
我想要实现的是在系统终止时,流停止请求新元素并且系统等待终止,直到流中的所有元素都被完全处理(到达接收器)
object StreamKillSwitch extends App {
implicit val system = ActorSystem(Behaviors.ignore, "sks")
implicit val ec: ExecutionContext = system.executionContext
val (killStream, done) =
Source(1 to 20)
.viaMat(KillSwitches.single)(Keep.right)
.map(i => {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"End task $i")
i
})
.toMat(Sink.foreach(println))(Keep.both)
.run()
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") {
() => Future(killStream.shutdown()).map(_ => Done)
}
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "wait-processing-complete") {
() => done
}
Thread.sleep(720)
system.terminate()
Await.ready(system.whenTerminated, 5.seconds)
}
我能够“解决”这个问题,但我很难解释这个行为。
出于某种原因,您对 map
(特别是 Thread.sleep
)的实施导致 killStream.shutdown()
中的 future/promise 回调传播速度不够快。我的猜测是它用阻塞的线程填满了主调度程序。
您可以增加源元素的数量 Source(1 to 10000)
和 Await
超时以查看终止开关最终传播。
但是,添加异步边界确实解决了问题并取得了预期的结果。
将您的 map
替换为以下 mapAsync
,它按预期工作。
.mapAsync(1) { i =>
Future {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"Start task $i")
i
}
}
通过添加 async
标记可以获得类似的结果。
.map(i => {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"End task $i")
i
})
.async
鉴于 akka 文档,我希望流在第 7/8 个元素之后停止。为什么它不停下来?它一直持续到最后一个元素(第 20 个)。
我想要实现的是在系统终止时,流停止请求新元素并且系统等待终止,直到流中的所有元素都被完全处理(到达接收器)
object StreamKillSwitch extends App {
implicit val system = ActorSystem(Behaviors.ignore, "sks")
implicit val ec: ExecutionContext = system.executionContext
val (killStream, done) =
Source(1 to 20)
.viaMat(KillSwitches.single)(Keep.right)
.map(i => {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"End task $i")
i
})
.toMat(Sink.foreach(println))(Keep.both)
.run()
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") {
() => Future(killStream.shutdown()).map(_ => Done)
}
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "wait-processing-complete") {
() => done
}
Thread.sleep(720)
system.terminate()
Await.ready(system.whenTerminated, 5.seconds)
}
我能够“解决”这个问题,但我很难解释这个行为。
出于某种原因,您对 map
(特别是 Thread.sleep
)的实施导致 killStream.shutdown()
中的 future/promise 回调传播速度不够快。我的猜测是它用阻塞的线程填满了主调度程序。
您可以增加源元素的数量 Source(1 to 10000)
和 Await
超时以查看终止开关最终传播。
但是,添加异步边界确实解决了问题并取得了预期的结果。
将您的 map
替换为以下 mapAsync
,它按预期工作。
.mapAsync(1) { i =>
Future {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"Start task $i")
i
}
}
通过添加 async
标记可以获得类似的结果。
.map(i => {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"End task $i")
i
})
.async