如何对流程中的上游完成做出反应?

How to react to upstream completion in my flow?

假设有一个这样定义的 Akka 流:

def tee = {
  var writer: Writer = ???

  Flow.fromFunction[String, String] { msg =>
    writer.write(msg)
    msg
  }
}

上游完成后需要刷新并关闭编写器。有没有办法不求助于 GraphStageLogic 等,如此处所述 https://doc.akka.io/docs/akka/current/stream/stream-customize.html?

如果不将 Flow 转换为 Sink,则无法使用 Flow 执行此操作。

如果可以选择接收器,请执行以下操作

  def tee = {
    val writer: Writer = new StringWriter()

    Sink
      .foreach[String] { msg =>
        writer.write(msg)
      }
      .mapMaterializedValue(_.map { done =>
        writer.close()
        done
      })
  }

类似的事情可以在 akka.stream.scaladsl.StreamConverters 的帮助下完成,如下

    val sink: Sink[String, Future[IOResult]] = {
      StreamConverters.fromOutputStream(() => new org.apache.commons.io.output.WriterOutputStream(writer)).contramap[String](ByteString.apply)
    }