Akka-streams mapConcat 不适用于循环的 RunnableGraph
Akka-streams mapConcat not working with cycled RunnableGraph
我有 RunnableGraph
喜欢以下内容。当 broadcast
和 merge
阶段之间有简单的 map
时,一切都很好。但是,当涉及到 mapConcat
时,此代码在消耗第一个元素后不起作用。
我想知道为什么它不起作用。
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val M = b.add(MergePreferred[Int](1))
val B = b.add(Broadcast[Int](2))
val S = Source(List(3))
S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B
ClosedShape
})
// run() output:
// 3
// List(2,2,2)
mapConcat
阶段阻塞了反馈循环,这是意料之中的。考虑以下事件链:
mapConcat
函数打印 List(2,2,2)
mapConcat
阶段需要发出 3 个可用元素中的第一个 (2, 2, 2)
- 需求必须来自 Merge 阶段,因此来自 Broadcast 阶段。
- 广播阶段背压,如果它的任何下游背压。它的下游是
Sink.ignore
(从不背压),以及 mapConcat
本身。
mapConcat
背压如果 "there are still remaining elements from the previously calculated collection",根据 docs。确实是这样。
也就是说,你的周期不平衡。您在反馈循环中引入的元素多于移除的元素。
此问题在 this documentation page 中有详细说明,其中还提供了一些解决方案。对于您的特定情况,由于您拥有过滤阶段,引入大于 13 的缓冲区将打印所有元素。但是,请注意该图只会挂起而不会在之后完成。
S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].buffer(20, OverflowStrategy.dropHead) <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B
我有 RunnableGraph
喜欢以下内容。当 broadcast
和 merge
阶段之间有简单的 map
时,一切都很好。但是,当涉及到 mapConcat
时,此代码在消耗第一个元素后不起作用。
我想知道为什么它不起作用。
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val M = b.add(MergePreferred[Int](1))
val B = b.add(Broadcast[Int](2))
val S = Source(List(3))
S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B
ClosedShape
})
// run() output:
// 3
// List(2,2,2)
mapConcat
阶段阻塞了反馈循环,这是意料之中的。考虑以下事件链:
mapConcat
函数打印List(2,2,2)
mapConcat
阶段需要发出 3 个可用元素中的第一个 (2, 2, 2)- 需求必须来自 Merge 阶段,因此来自 Broadcast 阶段。
- 广播阶段背压,如果它的任何下游背压。它的下游是
Sink.ignore
(从不背压),以及mapConcat
本身。 mapConcat
背压如果 "there are still remaining elements from the previously calculated collection",根据 docs。确实是这样。
也就是说,你的周期不平衡。您在反馈循环中引入的元素多于移除的元素。
此问题在 this documentation page 中有详细说明,其中还提供了一些解决方案。对于您的特定情况,由于您拥有过滤阶段,引入大于 13 的缓冲区将打印所有元素。但是,请注意该图只会挂起而不会在之后完成。
S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].buffer(20, OverflowStrategy.dropHead) <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B