Fold 和 reduce 在 运行 并行时表现出不确定的行为,为什么?

Fold and reduce show non-deterministic behavior when ran in parallel, why?

所以我正在尝试使用 Akka Streams 来计算项目的出现次数。 下面的示例是我所拥有的简化版本。我需要两个管道同时工作。由于某些原因,打印结果不正确。

有谁知道为什么会这样?我是否遗漏了有关子流的重要信息?

/**
 * SIMPLE EXAMPLE
 */
object TestingObject {
  import akka.actor.ActorSystem
  import akka.stream._
  import akka.stream.scaladsl._
  import java.nio.file.Paths
  import akka.util.ByteString
  import counting._
  import graph_components._

  // implicit actor system
  implicit val system:ActorSystem = ActorSystem("Sys")

  def main(args: Array[String]): Unit = {

    val customFlow = Flow.fromGraph(GraphDSL.create() {
      implicit builder =>
        import GraphDSL.Implicits._


        // Components
        val A   = builder.add(Balance[(Int, Int)](2, waitForAllDownstreams = true));
        val B1   = builder.add(mergeCountFold.async);
        val B2   = builder.add(mergeCountFold.async);
        val C   = builder.add(Merge[(Int, Int)](2));
        val D   = builder.add(mergeCountReduce);

        // Graph
        A ~> B1 ~> C ~> D
        A ~> B2 ~> C

        FlowShape(A.in, D.out);
    })

    // Run
    Source(0 to 101)
      .groupBy(10, x => x % 4)
      .map(x => (x % 4, 1))
      .via(customFlow)
      .mergeSubstreams
      .to(Sink.foreach(println)).run();
  }

  def mergeCountReduce = Flow[(Int, Int)].reduce((l, r) => {
    println("REDUCING");
    (l._1, l._2 + r._2)
  })
  def mergeCountFold = Flow[(Int, Int)].fold[(Int,Int)](0,0)((l, r) => {
    println("FOLDING");
    (r._1, l._2 + r._2)
  })

}

两个观察:

  • mergeCountReduce 将发出它看到的第一个键和看到的值的总和(如果它没有看到任何元素,流将失败)
  • mergeCountFold 将发出它看到的最后一个键和看到的值的总和(如果它没有看到任何元素,将发出一个键和零值)

(在这两种情况下,尽管密钥始终相同)

这些观察结果都不受 async 边界的影响。

不过,在前面的 Balance 运算符的上下文中,async 引入了一个隐式缓冲区,这可以防止它包装的图形在缓冲区已满之前受到反压。 Balance 将流值发送到第一个没有背压的输出,因此如果 Balance 之后的阶段没有明显慢于上游,Balance 可能只将值发送到一个输出(B1 在这种情况下)。

在那种情况下,使用 reduceB1 会发出密钥并计数,而 B2 失败,导致整个流失败。

对于 fold,在那种情况下,B1 会发出键并计数,而 B2,没有看到任何值会发出 (0,0)。合并将按照它们发出的顺序发出它们(假设有 50/50 的机会是合理的),因此最后的折叠将具有键和计数或零和计数。