在 Like Flow 中操作 Seq 元素

Manipulate Seq Elements in an Akka Flow

我有 2 个流程,如下所示:

val aToSeqOfB: Flow[A, Seq[B], NotUsed] = ...
val bToC: Flow[B, C, NotUsed] = ...

我想将它们组合成如下所示的便捷方法:

val aToSeqOfC: Flow[A, Seq[C], NotUsed]

到目前为止,我有以下内容,但我知道它只是以 C 个元素结束,而不是 Seq[C]

Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC)

在这种情况下如何保留 Seq

间接回答

在我看来,您的问题突出了处理 akka 流时常见的 "rookie mistakes" 之一。将业务逻辑放在 akka 流结构中通常不是好的组织。您的问题表明您有以下形式的内容:

val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B => 
  //business logic
}

更理想的情况是:

//normal function, no akka involved
val bToCFunc : B => C = { b : B =>
  //business logic
}

val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc

在上面的 "ideal" 示例中,Flow 只是在正常的非 akka 业务逻辑之上的一层薄薄的表面。

单独的逻辑可以简单地解决你原来的问题:

val aToSeqOfC : Flow[A, Seq[C], NotUsed] = 
  aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc))

直接回答

如果您无法重组代码,那么唯一可用的选择就是处理 Futures。您需要在单独的子流中使用 bToC

val mat : akka.stream.Materializer = ???

val seqBToSeqC : Seq[B] => Future[Seq[C]] = 
  (seqB) =>
    Source
      .apply(seqB.toIterable)
      .via(bToC)
      .to(Sink.seq[C])
      .run()

然后您可以在 mapAsync 中使用此函数来构建您正在寻找的流程:

val parallelism = 10

val aToSeqOfC: Flow[A, Seq[C], NotUsed] = 
  aToSeqB.mapAsync(parallelism)(seqBtoSeqC)