Akka-Stream 流中流
Akka-Stream stream within stream
我想弄清楚如何处理这样一种情况:在您的某个阶段中,您需要调用 return InputStream,我会将该流作为阶段的源来处理进一步下降。
例如
Source.map(e => Calls that return an InputStream)
.via(processingFlow).runwith(sink.ignore)
我希望进入处理流程的元素与来自 InputStream 的元素一样。这基本上是一种情况,我正在拖尾文件,读取每一行,该行为我提供了有关我需要针对 CLI api 进行的调用的信息,在进行该调用时,我将 Stdout 作为 InputStream 从读取结果。大多数时候结果会很大,所以我可以把所有的东西都收集在内存中。
- 您可以使用
StreamConverters
实用程序从 java.io 流中获取 Source
s 和 Sink
s。更多信息 here.
- 您可以使用
flatMapConcat
或 flatMapMerge
将 Source
的流扁平化为单个流。更多信息 here.
一个简单的例子可以是:
val source: Source[String, NotUsed] = ???
def gimmeInputStream(name: String): InputStream = ???
val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
source
.map(gimmeInputStream)
.flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192))
.via(processingFlow)
.runWith(Sink.ignore)
然而,Akka Streams 为 FileIO
对象中的 read/write 文件提供了更惯用的 DSL。更多信息 here.
示例变为:
val source: Source[String, NotUsed] = ???
val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
source
.flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name)))
.via(processingFlow)
.runWith(Sink.ignore)
我想弄清楚如何处理这样一种情况:在您的某个阶段中,您需要调用 return InputStream,我会将该流作为阶段的源来处理进一步下降。
例如
Source.map(e => Calls that return an InputStream)
.via(processingFlow).runwith(sink.ignore)
我希望进入处理流程的元素与来自 InputStream 的元素一样。这基本上是一种情况,我正在拖尾文件,读取每一行,该行为我提供了有关我需要针对 CLI api 进行的调用的信息,在进行该调用时,我将 Stdout 作为 InputStream 从读取结果。大多数时候结果会很大,所以我可以把所有的东西都收集在内存中。
- 您可以使用
StreamConverters
实用程序从 java.io 流中获取Source
s 和Sink
s。更多信息 here. - 您可以使用
flatMapConcat
或flatMapMerge
将Source
的流扁平化为单个流。更多信息 here.
一个简单的例子可以是:
val source: Source[String, NotUsed] = ???
def gimmeInputStream(name: String): InputStream = ???
val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
source
.map(gimmeInputStream)
.flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192))
.via(processingFlow)
.runWith(Sink.ignore)
然而,Akka Streams 为 FileIO
对象中的 read/write 文件提供了更惯用的 DSL。更多信息 here.
示例变为:
val source: Source[String, NotUsed] = ???
val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
source
.flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name)))
.via(processingFlow)
.runWith(Sink.ignore)