在 Like Flow 中操作 Seq 元素
Manipulate Seq Elements in an Akka Flow
我有 2 个流程,如下所示:
val aToSeqOfB: Flow[A, Seq[B], NotUsed] = ...
val bToC: Flow[B, C, NotUsed] = ...
我想将它们组合成如下所示的便捷方法:
val aToSeqOfC: Flow[A, Seq[C], NotUsed]
到目前为止,我有以下内容,但我知道它只是以 C
个元素结束,而不是 Seq[C]
。
Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC)
在这种情况下如何保留 Seq
?
间接回答
在我看来,您的问题突出了处理 akka 流时常见的 "rookie mistakes" 之一。将业务逻辑放在 akka 流结构中通常不是好的组织。您的问题表明您有以下形式的内容:
val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B =>
//business logic
}
更理想的情况是:
//normal function, no akka involved
val bToCFunc : B => C = { b : B =>
//business logic
}
val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc
在上面的 "ideal" 示例中,Flow
只是在正常的非 akka 业务逻辑之上的一层薄薄的表面。
单独的逻辑可以简单地解决你原来的问题:
val aToSeqOfC : Flow[A, Seq[C], NotUsed] =
aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc))
直接回答
如果您无法重组代码,那么唯一可用的选择就是处理 Futures。您需要在单独的子流中使用 bToC
:
val mat : akka.stream.Materializer = ???
val seqBToSeqC : Seq[B] => Future[Seq[C]] =
(seqB) =>
Source
.apply(seqB.toIterable)
.via(bToC)
.to(Sink.seq[C])
.run()
然后您可以在 mapAsync
中使用此函数来构建您正在寻找的流程:
val parallelism = 10
val aToSeqOfC: Flow[A, Seq[C], NotUsed] =
aToSeqB.mapAsync(parallelism)(seqBtoSeqC)
我有 2 个流程,如下所示:
val aToSeqOfB: Flow[A, Seq[B], NotUsed] = ...
val bToC: Flow[B, C, NotUsed] = ...
我想将它们组合成如下所示的便捷方法:
val aToSeqOfC: Flow[A, Seq[C], NotUsed]
到目前为止,我有以下内容,但我知道它只是以 C
个元素结束,而不是 Seq[C]
。
Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC)
在这种情况下如何保留 Seq
?
间接回答
在我看来,您的问题突出了处理 akka 流时常见的 "rookie mistakes" 之一。将业务逻辑放在 akka 流结构中通常不是好的组织。您的问题表明您有以下形式的内容:
val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B =>
//business logic
}
更理想的情况是:
//normal function, no akka involved
val bToCFunc : B => C = { b : B =>
//business logic
}
val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc
在上面的 "ideal" 示例中,Flow
只是在正常的非 akka 业务逻辑之上的一层薄薄的表面。
单独的逻辑可以简单地解决你原来的问题:
val aToSeqOfC : Flow[A, Seq[C], NotUsed] =
aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc))
直接回答
如果您无法重组代码,那么唯一可用的选择就是处理 Futures。您需要在单独的子流中使用 bToC
:
val mat : akka.stream.Materializer = ???
val seqBToSeqC : Seq[B] => Future[Seq[C]] =
(seqB) =>
Source
.apply(seqB.toIterable)
.via(bToC)
.to(Sink.seq[C])
.run()
然后您可以在 mapAsync
中使用此函数来构建您正在寻找的流程:
val parallelism = 10
val aToSeqOfC: Flow[A, Seq[C], NotUsed] =
aToSeqB.mapAsync(parallelism)(seqBtoSeqC)