mergeSubstreamsWithParallelism 与 mergeSubstreams
mergeSubstreamsWithParallelism vs mergeSubstreams
mergeSubstreams 和 mergeSubstreamsWithParallelism 有什么区别
我首先认为这只是性能差异但是在执行此代码时
.groupBy(magicNumber, tuple => tuple._2)
.fold(("", Seq.empty[String]))
{
case ((_, acc), tuple) => (tuple._2, acc :+ tuple._1)
}
.mergeSubstreams
我有一个 finit 流给我结果。
但是
.groupBy(numberIsp, tuple => tuple._2)
.fold(("", Seq.empty[String]))
{
case ((_, acc), tuple) => (tuple._2, acc :+ tuple._1)
}
.map{x=>println(x);x}
.mergeSubstreamsWithParallelism(10)
我有一个不打印任何内容的无限流。
就我而言,这没有任何区别,但我现在想知道为什么会出现这种行为。
编辑
这就是问题所在。
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))
implicit val ctx = implicitly[ExecutionContext](system.dispatcher)
val Integers = Source(1 to 100).map {
case n: Int => println(s"n is $n"); n
}
// same beginning, same sink, just different merges
val sameBeginning = Integers.groupBy(5, _ % 3).take(6).fold(0) {
case (a, b) => a + b
}.map {
case n: Int => println(s"Now n is $n !"); n
}
def sameSink(id: Int) = Sink.foreach[Int] {
case n: Int => println(s"Sink $id says - You are done now final val is $n")
}
val mergeWithPlenty = sameBeginning.mergeSubstreams.to(sameSink(1)).run()
val mergeWithNotEnough = sameBeginning.mergeSubstreamsWithParallelism(2).to(sameSink(2)).run()
现在,如果你 运行 那个,你几乎 马上就会看到 ...
Sink 1 says - You are done now final val is 51
Sink 1 says - You are done now final val is 57
Sink 1 says - You are done now final val is 63
很久以后...
Sink 2 says - You are done now final val is 51
Sink 2 says - You are done now final val is 57
水槽 2 甚至没有完成!
我可以让它变得更糟......你所要做的就是增加 Sink 2 上的压力,而不是元素,而是维度,或者子流的数量.
接收器 1 获得“尽可能多的”。根据您的命令,接收器 2 被告知仅使用有限数量。
如果有限数量足够,这不是问题。例如,采用上面的相同示例,但现在将 Sink 2 更改为具有并行度 20 或 20 pipes
.
看看会发生什么!
Sink 2 says - You are done now final val is 51
Sink 1 says - You are done now final val is 51
Sink 2 says - You are done now final val is 57
Sink 1 says - You are done now final val is 57
Sink 2 says - You are done now final val is 63
Sink 1 says - You are done now final val is 63
现在,Sink 2 赢了,我无法解释。我假设这是一个竞争条件,但关键是当有多余的管道时,它的行为完全符合您的预期。
年长 material..
def mergeSubstreams: F[Out] = mergeSubstreamsWithParallelism(Int.MaxValue)
假设您有相同版本的软件,看来不同之处在于您为“并行性”或可容忍的并发子流数量选择的整数...
可能由于并行度低,您最终遇到了流无法完成的情况。
这是实现您描述的行为的简单方法。
假设我有一个子流,它同时将 11
个元素的元素添加到一起。
您提供10
的并行度。
那么你的有限序列 55 应该被分成 11
的子序列......当有大量可用管道时你的有限序列就很好,因为它通过子流执行 5 个步骤.但是没有的时候就完不成,好像是无限的。
mergeSubstreams 和 mergeSubstreamsWithParallelism 有什么区别 我首先认为这只是性能差异但是在执行此代码时
.groupBy(magicNumber, tuple => tuple._2)
.fold(("", Seq.empty[String]))
{
case ((_, acc), tuple) => (tuple._2, acc :+ tuple._1)
}
.mergeSubstreams
我有一个 finit 流给我结果。 但是
.groupBy(numberIsp, tuple => tuple._2)
.fold(("", Seq.empty[String]))
{
case ((_, acc), tuple) => (tuple._2, acc :+ tuple._1)
}
.map{x=>println(x);x}
.mergeSubstreamsWithParallelism(10)
我有一个不打印任何内容的无限流。
就我而言,这没有任何区别,但我现在想知道为什么会出现这种行为。
编辑
这就是问题所在。
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))
implicit val ctx = implicitly[ExecutionContext](system.dispatcher)
val Integers = Source(1 to 100).map {
case n: Int => println(s"n is $n"); n
}
// same beginning, same sink, just different merges
val sameBeginning = Integers.groupBy(5, _ % 3).take(6).fold(0) {
case (a, b) => a + b
}.map {
case n: Int => println(s"Now n is $n !"); n
}
def sameSink(id: Int) = Sink.foreach[Int] {
case n: Int => println(s"Sink $id says - You are done now final val is $n")
}
val mergeWithPlenty = sameBeginning.mergeSubstreams.to(sameSink(1)).run()
val mergeWithNotEnough = sameBeginning.mergeSubstreamsWithParallelism(2).to(sameSink(2)).run()
现在,如果你 运行 那个,你几乎 马上就会看到 ...
Sink 1 says - You are done now final val is 51
Sink 1 says - You are done now final val is 57
Sink 1 says - You are done now final val is 63
很久以后...
Sink 2 says - You are done now final val is 51
Sink 2 says - You are done now final val is 57
水槽 2 甚至没有完成!
我可以让它变得更糟......你所要做的就是增加 Sink 2 上的压力,而不是元素,而是维度,或者子流的数量.
接收器 1 获得“尽可能多的”。根据您的命令,接收器 2 被告知仅使用有限数量。
如果有限数量足够,这不是问题。例如,采用上面的相同示例,但现在将 Sink 2 更改为具有并行度 20 或 20 pipes
.
看看会发生什么!
Sink 2 says - You are done now final val is 51
Sink 1 says - You are done now final val is 51
Sink 2 says - You are done now final val is 57
Sink 1 says - You are done now final val is 57
Sink 2 says - You are done now final val is 63
Sink 1 says - You are done now final val is 63
现在,Sink 2 赢了,我无法解释。我假设这是一个竞争条件,但关键是当有多余的管道时,它的行为完全符合您的预期。
年长 material..
def mergeSubstreams: F[Out] = mergeSubstreamsWithParallelism(Int.MaxValue)
假设您有相同版本的软件,看来不同之处在于您为“并行性”或可容忍的并发子流数量选择的整数...
可能由于并行度低,您最终遇到了流无法完成的情况。
这是实现您描述的行为的简单方法。
假设我有一个子流,它同时将 11
个元素的元素添加到一起。
您提供10
的并行度。
那么你的有限序列 55 应该被分成 11
的子序列......当有大量可用管道时你的有限序列就很好,因为它通过子流执行 5 个步骤.但是没有的时候就完不成,好像是无限的。