转换 collections 流的内部元素
Transforming the inner elements of a stream of collections
我最近在空闲时间学习使用 Akka Streams(在 Scala 和 java 中)并且想知道如何实现以下场景。
我有一个连续的非常大的 Collection 流进入我的管道,我想让管道转换每个 Collection 中的元素。
将 Collection 转换为其元素流很容易,但我还需要将 1 Collection 中所有转换后的元素重新组合成 1 个新的 Collection(包含只有转换后的 objects 之前也在原始 collection 中。所以我必须知道 1 Collection 的特定元素流何时被处理,因为那时我可以发出转换后的 collection 以便在通用管道中进一步处理。
根据评论者的建议,您可以在 transformationPipeline
到 assemble 列表类型元素中使用 fold
。要在 运行 Stream 时保持列表边界,而不是 mapConcat
使用 flatMapConcat
,如以下简单示例所示:
def transform(s: String): Int = s.length
val transformationPipeline: Flow[String, List[Int], NotUsed] = Flow[String].
fold(List.empty[Int])((ls, s) => transform(s) :: ls).
map(_.reverse)
val flow: Flow[List[String], List[Int], NotUsed] = Flow[List[String]].
flatMapConcat(Source(_).via(transformationPipeline))
Source(List("a", "bb") :: List("cc", "ddd", "e") :: Nil).
via(flow).
runForeach(println)
// List(1, 2)
// List(2, 3, 1)
我最近在空闲时间学习使用 Akka Streams(在 Scala 和 java 中)并且想知道如何实现以下场景。
我有一个连续的非常大的 Collection 流进入我的管道,我想让管道转换每个 Collection 中的元素。
将 Collection 转换为其元素流很容易,但我还需要将 1 Collection 中所有转换后的元素重新组合成 1 个新的 Collection(包含只有转换后的 objects 之前也在原始 collection 中。所以我必须知道 1 Collection 的特定元素流何时被处理,因为那时我可以发出转换后的 collection 以便在通用管道中进一步处理。
根据评论者的建议,您可以在 transformationPipeline
到 assemble 列表类型元素中使用 fold
。要在 运行 Stream 时保持列表边界,而不是 mapConcat
使用 flatMapConcat
,如以下简单示例所示:
def transform(s: String): Int = s.length
val transformationPipeline: Flow[String, List[Int], NotUsed] = Flow[String].
fold(List.empty[Int])((ls, s) => transform(s) :: ls).
map(_.reverse)
val flow: Flow[List[String], List[Int], NotUsed] = Flow[List[String]].
flatMapConcat(Source(_).via(transformationPipeline))
Source(List("a", "bb") :: List("cc", "ddd", "e") :: Nil).
via(flow).
runForeach(println)
// List(1, 2)
// List(2, 3, 1)