Fold 和 reduce 在 运行 并行时表现出不确定的行为,为什么?
Fold and reduce show non-deterministic behavior when ran in parallel, why?
所以我正在尝试使用 Akka Streams 来计算项目的出现次数。
下面的示例是我所拥有的简化版本。我需要两个管道同时工作。由于某些原因,打印结果不正确。
有谁知道为什么会这样?我是否遗漏了有关子流的重要信息?
/**
* SIMPLE EXAMPLE
*/
object TestingObject {
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file.Paths
import akka.util.ByteString
import counting._
import graph_components._
// implicit actor system
implicit val system:ActorSystem = ActorSystem("Sys")
def main(args: Array[String]): Unit = {
val customFlow = Flow.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._
// Components
val A = builder.add(Balance[(Int, Int)](2, waitForAllDownstreams = true));
val B1 = builder.add(mergeCountFold.async);
val B2 = builder.add(mergeCountFold.async);
val C = builder.add(Merge[(Int, Int)](2));
val D = builder.add(mergeCountReduce);
// Graph
A ~> B1 ~> C ~> D
A ~> B2 ~> C
FlowShape(A.in, D.out);
})
// Run
Source(0 to 101)
.groupBy(10, x => x % 4)
.map(x => (x % 4, 1))
.via(customFlow)
.mergeSubstreams
.to(Sink.foreach(println)).run();
}
def mergeCountReduce = Flow[(Int, Int)].reduce((l, r) => {
println("REDUCING");
(l._1, l._2 + r._2)
})
def mergeCountFold = Flow[(Int, Int)].fold[(Int,Int)](0,0)((l, r) => {
println("FOLDING");
(r._1, l._2 + r._2)
})
}
两个观察:
mergeCountReduce
将发出它看到的第一个键和看到的值的总和(如果它没有看到任何元素,流将失败)
mergeCountFold
将发出它看到的最后一个键和看到的值的总和(如果它没有看到任何元素,将发出一个键和零值)
(在这两种情况下,尽管密钥始终相同)
这些观察结果都不受 async
边界的影响。
不过,在前面的 Balance
运算符的上下文中,async
引入了一个隐式缓冲区,这可以防止它包装的图形在缓冲区已满之前受到反压。 Balance
将流值发送到第一个没有背压的输出,因此如果 Balance
之后的阶段没有明显慢于上游,Balance
可能只将值发送到一个输出(B1
在这种情况下)。
在那种情况下,使用 reduce
,B1
会发出密钥并计数,而 B2
失败,导致整个流失败。
对于 fold
,在那种情况下,B1
会发出键并计数,而 B2
,没有看到任何值会发出 (0,0)
。合并将按照它们发出的顺序发出它们(假设有 50/50 的机会是合理的),因此最后的折叠将具有键和计数或零和计数。
所以我正在尝试使用 Akka Streams 来计算项目的出现次数。 下面的示例是我所拥有的简化版本。我需要两个管道同时工作。由于某些原因,打印结果不正确。
有谁知道为什么会这样?我是否遗漏了有关子流的重要信息?
/**
* SIMPLE EXAMPLE
*/
object TestingObject {
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file.Paths
import akka.util.ByteString
import counting._
import graph_components._
// implicit actor system
implicit val system:ActorSystem = ActorSystem("Sys")
def main(args: Array[String]): Unit = {
val customFlow = Flow.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._
// Components
val A = builder.add(Balance[(Int, Int)](2, waitForAllDownstreams = true));
val B1 = builder.add(mergeCountFold.async);
val B2 = builder.add(mergeCountFold.async);
val C = builder.add(Merge[(Int, Int)](2));
val D = builder.add(mergeCountReduce);
// Graph
A ~> B1 ~> C ~> D
A ~> B2 ~> C
FlowShape(A.in, D.out);
})
// Run
Source(0 to 101)
.groupBy(10, x => x % 4)
.map(x => (x % 4, 1))
.via(customFlow)
.mergeSubstreams
.to(Sink.foreach(println)).run();
}
def mergeCountReduce = Flow[(Int, Int)].reduce((l, r) => {
println("REDUCING");
(l._1, l._2 + r._2)
})
def mergeCountFold = Flow[(Int, Int)].fold[(Int,Int)](0,0)((l, r) => {
println("FOLDING");
(r._1, l._2 + r._2)
})
}
两个观察:
mergeCountReduce
将发出它看到的第一个键和看到的值的总和(如果它没有看到任何元素,流将失败)mergeCountFold
将发出它看到的最后一个键和看到的值的总和(如果它没有看到任何元素,将发出一个键和零值)
(在这两种情况下,尽管密钥始终相同)
这些观察结果都不受 async
边界的影响。
不过,在前面的 Balance
运算符的上下文中,async
引入了一个隐式缓冲区,这可以防止它包装的图形在缓冲区已满之前受到反压。 Balance
将流值发送到第一个没有背压的输出,因此如果 Balance
之后的阶段没有明显慢于上游,Balance
可能只将值发送到一个输出(B1
在这种情况下)。
在那种情况下,使用 reduce
,B1
会发出密钥并计数,而 B2
失败,导致整个流失败。
对于 fold
,在那种情况下,B1
会发出键并计数,而 B2
,没有看到任何值会发出 (0,0)
。合并将按照它们发出的顺序发出它们(假设有 50/50 的机会是合理的),因此最后的折叠将具有键和计数或零和计数。