转换 collections 流的内部元素

Transforming the inner elements of a stream of collections

我最近在空闲时间学习使用 Akka Streams(在 Scala 和 java 中)并且想知道如何实现以下场景。

我有一个连续的非常大的 Collection 流进入我的管道,我想让管道转换每个 Collection 中的元素。

将 Collection 转换为其元素流很容易,但我还需要将 1 Collection 中所有转换后的元素重新组合成 1 个新的 Collection(包含只有转换后的 objects 之前也在原始 collection 中。所以我必须知道 1 Collection 的特定元素流何时被处理,因为那时我可以发出转换后的 collection 以便在通用管道中进一步处理。

根据评论者的建议,您可以在 transformationPipeline 到 assemble 列表类型元素中使用 fold。要在 运行 Stream 时保持列表边界,而不是 mapConcat 使用 flatMapConcat,如以下简单示例所示:

def transform(s: String): Int = s.length

val transformationPipeline: Flow[String, List[Int], NotUsed] = Flow[String].
  fold(List.empty[Int])((ls, s) => transform(s) :: ls).
  map(_.reverse)

val flow: Flow[List[String], List[Int], NotUsed] = Flow[List[String]].
  flatMapConcat(Source(_).via(transformationPipeline))

Source(List("a", "bb") :: List("cc", "ddd", "e") :: Nil).
  via(flow).
  runForeach(println)
// List(1, 2)
// List(2, 3, 1)