从 GraphStage (Akka 2.4.2) 内部关闭 Akka 流

Closing an Akka stream from inside a GraphStage (Akka 2.4.2)

在 Akka Stream 2.4.2 中,PushStage 已被弃用。对于 Streams 2.0.3,我使用的是这个答案中的解决方案:

这是:

import akka.stream.stage._

    val closeStage = new PushStage[Tpe, Tpe] {
      override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
        case elem if shouldCloseStream ⇒
          // println("stream closed")
          ctx.finish()
        case elem ⇒
          ctx.push(elem)
      }
    }

我如何从 GraphStage / onPush() 内部立即关闭 2.4.2 中的流?

使用这样的东西:

val closeStage = new GraphStage[FlowShape[Tpe, Tpe]] {
  val in = Inlet[Tpe]("closeStage.in")
  val out = Outlet[Tpe]("closeStage.out")

  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case elem if shouldCloseStream ⇒
          // println("stream closed")
          completeStage()
        case msg ⇒
          push(out, msg)
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

它更冗长,但一方面可以以可重用的方式定义此逻辑,另一方面不再需要担心流元素之间的差异,因为可以处理 GraphStage与处理流的方式相同:

val flow: Flow[Tpe] = ???
val newFlow = flow.via(closeStage)

张贴供其他人参考。 sschaef 的回答在程序上是正确的,但是连接保持打开状态一分钟,最终会超时并抛出 "no activity" 异常,关闭连接。

在进一步阅读文档时,我注意到当所有上游流完成时连接已关闭。就我而言,我有不止一个上游。

对于我的特定用例,解决方法是添加 eagerComplete=true 以在任何(而不是所有)上游完成后立即关闭流。类似于:

... = builder.add(Merge[MyObj](3,eagerComplete = true))

希望这对某人有所帮助。