Akka-streams:如何将流连接到流?
Akka-streams: How-to connect a stream to a stream?
如果流程 A 产生
Pair<Source<ByteString,?>, Object>
如何将此出口连接到映射在源上的流 B 的入口。例如流 B 的出口将是
Pair<InputStream<Long>,Object>.
如果您获得了您提供的类型的来源:
Source< Pair<Source<ByteString, BoxedUnit>, Object>, BoxedUnit> pairSource;
并且您还获得了将 ByteString
值转换为 InputStream<Long>
:
的流程
Flow<ByteString, InputStream<Long>, ?> byteStrToInputStream;
然后可以使用 flatMapConcat 进行转换:
Source< Pair<InputStream<Long>,Object>, BoxedUnit> inputStreamSource =
pairSource.flatMapConcat( pair -> {
pair.getKey()
.via(byteStrToInputStream)
.map(inputStream -> ImmutablePair(inputStream, pair.getValue()))
})
每一个传入的ByteStrings Sources都被耗尽,结果被串联起来。每对的原始右侧,即Object
被添加到每个InputStream.
如果流程 A 产生
Pair<Source<ByteString,?>, Object>
如何将此出口连接到映射在源上的流 B 的入口。例如流 B 的出口将是
Pair<InputStream<Long>,Object>.
如果您获得了您提供的类型的来源:
Source< Pair<Source<ByteString, BoxedUnit>, Object>, BoxedUnit> pairSource;
并且您还获得了将 ByteString
值转换为 InputStream<Long>
:
Flow<ByteString, InputStream<Long>, ?> byteStrToInputStream;
然后可以使用 flatMapConcat 进行转换:
Source< Pair<InputStream<Long>,Object>, BoxedUnit> inputStreamSource =
pairSource.flatMapConcat( pair -> {
pair.getKey()
.via(byteStrToInputStream)
.map(inputStream -> ImmutablePair(inputStream, pair.getValue()))
})
每一个传入的ByteStrings Sources都被耗尽,结果被串联起来。每对的原始右侧,即Object
被添加到每个InputStream.