Akka 流中的 groupBy 是否会并行创建 运行 的子流?

Does the groupBy in Akka stream creates substreams that run in parallel?

我使用 Akka 流创建了这个 groupBy -> map -> mergeSubstreamsWithParallelism 的例子。在我正在做的课程中,它说 groupBy 将根据我传递给它的参数创建 X 个子流,然后我必须将这些子流合并到一个流中。所以,我知道 map 运算符是并行的 运行ning 。是吗?

如果是这样,为什么我可以看到同一个线程在这段代码中执行 map 运算符:

val textSource = Source(List(
  "I love Akka streams", // odd
  "this has even characters", // even
  "this is amazing", // odd
  "learning Akka at the Rock the JVM", // odd
  "Let's rock the JVM", // even
  "123", // odd
  "1234" // even
))
val totalCharCountFuture = textSource
  .groupBy(2, string => string.length % 2)
  .map { c =>
    println(s"I am running on thread [${Thread.currentThread().getId}]")
    c.length
  }// .async // this operator runs in parallel
  .mergeSubstreamsWithParallelism(2)
  .toMat(Sink.reduce[Int](_ + _))(Keep.right)
  .run()
totalCharCountFuture.onComplete {
  case Success(value) => println(s"total char count: $value")
  case Failure(exception) => println(s"failed computation: $exception")
}

输出:

I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
total char count: 116

然后我添加了 .async 以使运算符 运行 异步进入。然后我的输出显示执行 map 运算符的不同线程:

I am running on thread [21]
I am running on thread [21]
I am running on thread [21]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]

我阅读了 Akka 文档中关于异步边界的文档:

Put an asynchronous boundary around this Flow. If this is a SubFlow (created e.g. by groupBy), this creates an asynchronous boundary around each materialized sub-flow, not the super-flow. That way, the super-flow will communicate with sub-flows asynchronously.

所以,我是否需要在 groupBy 之后添加 .async 来确保所有子流是否并行执行?我正在做的这个测试是否验证了 Akka 流中运算符的并行性?

谢谢

So, do I need the .async after the groupBy to ensure that all substreams are executing in parallel or not? Is this test that I am doing validity the parallelism of an operator in Akka stream?

简短的回答是“是”,您需要 async

作为 Akka Streams(以及其他响应式流规范实现,如 RxJava 或 Project Reactor)的经验法则,您需要明确划分异步边界。默认情况下,流是单线程执行的(或者在 Akka Streams 的情况下是单角色)。这包括像 groupBy 这样的运算符。起初这似乎有点违反直觉,但当你考虑它时,并行执行在 groupBy 语义中并不是真正必须的,尽管你经常想要并行执行,因为这是你应用 groupBy 的真正原因],无论是使用所有可用于某些计算任务的内核,还是可能并行调用某些外部服务并获得更好的吞吐量。在这些情况下,您需要显式编码以实现并行性。一种方法是像您在示例中所做的那样使用 async,其中流执行实现逻辑将引入并行性,或者您也可以使用 mapAsync,其中并行性是通过流执行逻辑外部的某种方式引入的.