优雅地停止 Akka Stream

Gracefully stopping an Akka Stream

鉴于 akka 文档,我希望流在第 7/8 个元素之后停止。为什么它不停下来?它一直持续到最后一个元素(第 20 个)。

我想要实现的是在系统终止时,流停止请求新元素并且系统等待终止,直到流中的所有元素都被完全处理(到达接收器)

object StreamKillSwitch extends App {

  implicit val system = ActorSystem(Behaviors.ignore, "sks")
  implicit val ec: ExecutionContext = system.executionContext

  val (killStream, done) =
    Source(1 to 20)
      .viaMat(KillSwitches.single)(Keep.right)
      .map(i => {
        system.log.info(s"Start task $i")
        Thread.sleep(100)
        system.log.info(s"End task $i")
        i
      })
      .toMat(Sink.foreach(println))(Keep.both)
      .run()

  CoordinatedShutdown(system)
    .addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") {
      () => Future(killStream.shutdown()).map(_ => Done)
    }

  CoordinatedShutdown(system)
    .addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "wait-processing-complete") {
      () => done
    }

  Thread.sleep(720)

  system.terminate()
  Await.ready(system.whenTerminated, 5.seconds)
}

我能够“解决”这个问题,但我很难解释这个行为。

出于某种原因,您对 map(特别是 Thread.sleep)的实施导致 killStream.shutdown() 中的 future/promise 回调传播速度不够快。我的猜测是它用阻塞的线程填满了主调度程序。

您可以增加源元素的数量 Source(1 to 10000)Await 超时以查看终止开关最终传播。

但是,添加异步边界确实解决了问题并取得了预期的结果。

将您的 map 替换为以下 mapAsync,它按预期工作。

.mapAsync(1) { i =>
  Future {
    system.log.info(s"Start task $i")
    Thread.sleep(100)
    system.log.info(s"Start task $i")
    i
  }
}

通过添加 async 标记可以获得类似的结果。

.map(i => {
  system.log.info(s"Start task $i")
  Thread.sleep(100)
  system.log.info(s"End task $i")
  i
})
.async