Akka-streams mapConcat 不适用于循环的 RunnableGraph

Akka-streams mapConcat not working with cycled RunnableGraph

我有 RunnableGraph 喜欢以下内容。当 broadcastmerge 阶段之间有简单的 map 时,一切都很好。但是,当涉及到 mapConcat 时,此代码在消耗第一个元素后不起作用。

我想知道为什么它不起作用。

RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val M = b.add(MergePreferred[Int](1))
val B = b.add(Broadcast[Int](2))
val S = Source(List(3))

S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0)  <~ B
ClosedShape 
}) 

// run() output: 
// 3
// List(2,2,2)

mapConcat 阶段阻塞了反馈循环,这是意料之中的。考虑以下事件链:

  1. mapConcat 函数打印 List(2,2,2)
  2. mapConcat 阶段需要发出 3 个可用元素中的第一个 (2, 2, 2)
  3. 需求必须来自 Merge 阶段,因此来自 Broadcast 阶段。
  4. 广播阶段背压,如果它的任何下游背压。它的下游是 Sink.ignore(从不背压),以及 mapConcat 本身。
  5. mapConcat 背压如果 "there are still remaining elements from the previously calculated collection",根据 docs。确实是这样。

也就是说,你的周期不平衡。您在反馈循环中引入的元素多于移除的元素。

此问题在 this documentation page 中有详细说明,其中还提供了一些解决方案。对于您的特定情况,由于您拥有过滤阶段,引入大于 13 的缓冲区将打印所有元素。但是,请注意该图只会挂起而不会在之后完成。

S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].buffer(20, OverflowStrategy.dropHead) <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0)  <~ B