如何关闭 Akka 流?
How does one close an Akka stream?
我正在将一些实验性喷射代码更新到 akka-stream (2.0.2),所以我会一点一点地浏览他的文档。我的需求之一是,如果我在流中检测到协议违规,我需要立即关闭流并启动客户端。
从流内部立即关闭(终止)流的正确方法是什么?
使用 PushStage
:
import akka.stream.stage._
val closeStage = new PushStage[Tpe, Tpe] {
override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
case elem if shouldCloseStream ⇒
// println("stream closed")
ctx.finish()
case elem ⇒
ctx.push(elem)
}
}
您可以通过 transform
方法将 PushStage
与 Flow
结合起来:
Flow[Tpe]
.map(...)
.transform(() ⇒ closeStage)
.map(...)
我正在将一些实验性喷射代码更新到 akka-stream (2.0.2),所以我会一点一点地浏览他的文档。我的需求之一是,如果我在流中检测到协议违规,我需要立即关闭流并启动客户端。
从流内部立即关闭(终止)流的正确方法是什么?
使用 PushStage
:
import akka.stream.stage._
val closeStage = new PushStage[Tpe, Tpe] {
override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
case elem if shouldCloseStream ⇒
// println("stream closed")
ctx.finish()
case elem ⇒
ctx.push(elem)
}
}
您可以通过 transform
方法将 PushStage
与 Flow
结合起来:
Flow[Tpe]
.map(...)
.transform(() ⇒ closeStage)
.map(...)