mergeSubstreamsWithParallelism 与 mergeSubstreams

mergeSubstreamsWithParallelism vs mergeSubstreams

mergeSubstreams 和 mergeSubstreamsWithParallelism 有什么区别 我首先认为这只是性能差异但是在执行此代码时

.groupBy(magicNumber, tuple => tuple._2)
.fold(("", Seq.empty[String]))
{
  case ((_, acc), tuple) => (tuple._2, acc :+ tuple._1)
}
.mergeSubstreams

我有一个 finit 流给我结果。 但是

.groupBy(numberIsp, tuple => tuple._2)
.fold(("", Seq.empty[String]))
{
  case ((_, acc), tuple) => (tuple._2, acc :+ tuple._1)
}
.map{x=>println(x);x}
.mergeSubstreamsWithParallelism(10)

我有一个不打印任何内容的无限流。

就我而言,这没有任何区别,但我现在想知道为什么会出现这种行为。

编辑

这就是问题所在。

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))
  implicit val ctx = implicitly[ExecutionContext](system.dispatcher)

  val Integers = Source(1 to 100).map {
    case n: Int => println(s"n is $n"); n
  }

  // same beginning, same sink, just different merges

  val sameBeginning = Integers.groupBy(5, _ % 3).take(6).fold(0) {
    case (a, b) => a + b
  }.map {
    case n: Int => println(s"Now n is $n !"); n
  }

  def sameSink(id: Int) = Sink.foreach[Int] {
    case n: Int => println(s"Sink $id says - You are done now final val is $n")
  }

  val mergeWithPlenty = sameBeginning.mergeSubstreams.to(sameSink(1)).run()

  val mergeWithNotEnough = sameBeginning.mergeSubstreamsWithParallelism(2).to(sameSink(2)).run()

现在,如果你 运行 那个,你几乎 马上就会看到 ...

Sink 1 says - You are done now final val is 51
Sink 1 says - You are done now final val is 57
Sink 1 says - You are done now final val is 63

很久以后...

Sink 2 says - You are done now final val is 51
Sink 2 says - You are done now final val is 57

水槽 2 甚至没有完成!

我可以让它变得更糟......你所要做的就是增加 Sink 2 上的压力,而不是元素,而是维度,或者子流的数量.

接收器 1 获得“尽可能多的”。根据您的命令,接收器 2 被告知仅使用有限数量。

如果有限数量足够,这不是问题。例如,采用上面的相同示例,但现在将 Sink 2 更改为具有并行度 20 或 20 pipes.

看看会发生什么!

Sink 2 says - You are done now final val is 51
Sink 1 says - You are done now final val is 51
Sink 2 says - You are done now final val is 57
Sink 1 says - You are done now final val is 57
Sink 2 says - You are done now final val is 63
Sink 1 says - You are done now final val is 63

现在,Sink 2 赢了,我无法解释。我假设这是一个竞争条件,但关键是当有多余的管道时,它的行为完全符合您的预期。

年长 material..

def mergeSubstreams: F[Out] = mergeSubstreamsWithParallelism(Int.MaxValue)

假设您有相同版本的软件,看来不同之处在于您为“并行性”或可容忍的并发子流数量选择的整数...

可能由于并行度低,您最终遇到了流无法完成的情况。

这是实现您描述的行为的简单方法。

假设我有一个子流,它同时将 11 个元素的元素添加到一起。

您提供10的并行度。

那么你的有限序列 55 应该被分成 11 的子序列......当有大量可用管道时你的有限序列就很好,因为它通过子流执行 5 个步骤.但是没有的时候就完不成,好像是无限的。