如何对流程中的上游完成做出反应?
How to react to upstream completion in my flow?
假设有一个这样定义的 Akka 流:
def tee = {
var writer: Writer = ???
Flow.fromFunction[String, String] { msg =>
writer.write(msg)
msg
}
}
上游完成后需要刷新并关闭编写器。有没有办法不求助于 GraphStageLogic
等,如此处所述 https://doc.akka.io/docs/akka/current/stream/stream-customize.html?
如果不将 Flow 转换为 Sink,则无法使用 Flow 执行此操作。
如果可以选择接收器,请执行以下操作
def tee = {
val writer: Writer = new StringWriter()
Sink
.foreach[String] { msg =>
writer.write(msg)
}
.mapMaterializedValue(_.map { done =>
writer.close()
done
})
}
类似的事情可以在 akka.stream.scaladsl.StreamConverters
的帮助下完成,如下
val sink: Sink[String, Future[IOResult]] = {
StreamConverters.fromOutputStream(() => new org.apache.commons.io.output.WriterOutputStream(writer)).contramap[String](ByteString.apply)
}
假设有一个这样定义的 Akka 流:
def tee = {
var writer: Writer = ???
Flow.fromFunction[String, String] { msg =>
writer.write(msg)
msg
}
}
上游完成后需要刷新并关闭编写器。有没有办法不求助于 GraphStageLogic
等,如此处所述 https://doc.akka.io/docs/akka/current/stream/stream-customize.html?
如果不将 Flow 转换为 Sink,则无法使用 Flow 执行此操作。
如果可以选择接收器,请执行以下操作
def tee = {
val writer: Writer = new StringWriter()
Sink
.foreach[String] { msg =>
writer.write(msg)
}
.mapMaterializedValue(_.map { done =>
writer.close()
done
})
}
类似的事情可以在 akka.stream.scaladsl.StreamConverters
的帮助下完成,如下
val sink: Sink[String, Future[IOResult]] = {
StreamConverters.fromOutputStream(() => new org.apache.commons.io.output.WriterOutputStream(writer)).contramap[String](ByteString.apply)
}