Akka-streams:如何将流连接到流?

Akka-streams: How-to connect a stream to a stream?

如果流程 A 产生

Pair<Source<ByteString,?>, Object> 

如何将此出口连接到映射在源上的流 B 的入口。例如流 B 的出口将是

Pair<InputStream<Long>,Object>.

如果您获得了您提供的类型的来源:

Source< Pair<Source<ByteString, BoxedUnit>, Object>, BoxedUnit> pairSource;

并且您还获得了将 ByteString 值转换为 InputStream<Long>:

的流程
Flow<ByteString, InputStream<Long>, ?> byteStrToInputStream;

然后可以使用 flatMapConcat 进行转换:

Source< Pair<InputStream<Long>,Object>, BoxedUnit> inputStreamSource = 
  pairSource.flatMapConcat( pair -> {
    pair.getKey()
        .via(byteStrToInputStream)
        .map(inputStream -> ImmutablePair(inputStream, pair.getValue()))
  })

每一个传入的ByteStrings Sources都被耗尽,结果被串联起来。每对的原始右侧,即Object被添加到每个InputStream.