如何关闭 Akka 流?

How does one close an Akka stream?

我正在将一些实验性喷射代码更新到 akka-stream (2.0.2),所以我会一点一点地浏览他的文档。我的需求之一是,如果我在流中检测到协议违规,我需要立即关闭流并启动客户端。

从流内部立即关闭(终止)流的正确方法是什么?

使用 PushStage:

import akka.stream.stage._

val closeStage = new PushStage[Tpe, Tpe] {
  override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
    case elem if shouldCloseStream ⇒
      // println("stream closed")
      ctx.finish()
    case elem ⇒
      ctx.push(elem)
  }
}

您可以通过 transform 方法将 PushStageFlow 结合起来:

Flow[Tpe]
.map(...)
.transform(() ⇒ closeStage)
.map(...)