Akka-Stream 流中流

Akka-Stream stream within stream

我想弄清楚如何处理这样一种情况:在您的某个阶段中,您需要调用 return InputStream,我会将该流作为阶段的源来处理进一步下降。

例如

 Source.map(e => Calls that return an InputStream)
 .via(processingFlow).runwith(sink.ignore)

我希望进入处理流程的元素与来自 InputStream 的元素一样。这基本上是一种情况,我正在拖尾文件,读取每一行,该行为我提供了有关我需要针对 CLI api 进行的调用的信息,在进行该调用时,我将 Stdout 作为 InputStream 从读取结果。大多数时候结果会很大,所以我可以把所有的东西都收集在内存中。

  • 您可以使用 StreamConverters 实用程序从 java.io 流中获取 Sources 和 Sinks。更多信息 here.
  • 您可以使用 flatMapConcatflatMapMergeSource 的流扁平化为单个流。更多信息 here.

一个简单的例子可以是:

  val source: Source[String, NotUsed] = ???
  def gimmeInputStream(name: String): InputStream = ???
  val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???

  source
    .map(gimmeInputStream)
    .flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192))
    .via(processingFlow)
    .runWith(Sink.ignore)

然而,Akka Streams 为 FileIO 对象中的 read/write 文件提供了更惯用的 DSL。更多信息 here.

示例变为:

  val source: Source[String, NotUsed] = ???
  val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???

  source
    .flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name)))
    .via(processingFlow)
    .runWith(Sink.ignore)