Akka 流中的 groupBy 是否会并行创建 运行 的子流?
Does the groupBy in Akka stream creates substreams that run in parallel?
我使用 Akka 流创建了这个 groupBy
-> map
-> mergeSubstreamsWithParallelism
的例子。在我正在做的课程中,它说 groupBy
将根据我传递给它的参数创建 X 个子流,然后我必须将这些子流合并到一个流中。所以,我知道 map
运算符是并行的 运行ning 。是吗?
如果是这样,为什么我可以看到同一个线程在这段代码中执行 map
运算符:
val textSource = Source(List(
"I love Akka streams", // odd
"this has even characters", // even
"this is amazing", // odd
"learning Akka at the Rock the JVM", // odd
"Let's rock the JVM", // even
"123", // odd
"1234" // even
))
val totalCharCountFuture = textSource
.groupBy(2, string => string.length % 2)
.map { c =>
println(s"I am running on thread [${Thread.currentThread().getId}]")
c.length
}// .async // this operator runs in parallel
.mergeSubstreamsWithParallelism(2)
.toMat(Sink.reduce[Int](_ + _))(Keep.right)
.run()
totalCharCountFuture.onComplete {
case Success(value) => println(s"total char count: $value")
case Failure(exception) => println(s"failed computation: $exception")
}
输出:
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
total char count: 116
然后我添加了 .async
以使运算符 运行 异步进入。然后我的输出显示执行 map
运算符的不同线程:
I am running on thread [21]
I am running on thread [21]
I am running on thread [21]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]
我阅读了 Akka 文档中关于异步边界的文档:
Put an asynchronous boundary around this Flow. If this is a SubFlow
(created e.g. by groupBy), this creates an asynchronous boundary
around each materialized sub-flow, not the super-flow. That way, the
super-flow will communicate with sub-flows asynchronously.
所以,我是否需要在 groupBy
之后添加 .async
来确保所有子流是否并行执行?我正在做的这个测试是否验证了 Akka 流中运算符的并行性?
谢谢
So, do I need the .async after the groupBy to ensure that all substreams are executing in parallel or not? Is this test that I am doing validity the parallelism of an operator in Akka stream?
简短的回答是“是”,您需要 async
。
作为 Akka Streams(以及其他响应式流规范实现,如 RxJava 或 Project Reactor)的经验法则,您需要明确划分异步边界。默认情况下,流是单线程执行的(或者在 Akka Streams 的情况下是单角色)。这包括像 groupBy
这样的运算符。起初这似乎有点违反直觉,但当你考虑它时,并行执行在 groupBy
语义中并不是真正必须的,尽管你经常想要并行执行,因为这是你应用 groupBy
的真正原因],无论是使用所有可用于某些计算任务的内核,还是可能并行调用某些外部服务并获得更好的吞吐量。在这些情况下,您需要显式编码以实现并行性。一种方法是像您在示例中所做的那样使用 async
,其中流执行实现逻辑将引入并行性,或者您也可以使用 mapAsync
,其中并行性是通过流执行逻辑外部的某种方式引入的.
我使用 Akka 流创建了这个 groupBy
-> map
-> mergeSubstreamsWithParallelism
的例子。在我正在做的课程中,它说 groupBy
将根据我传递给它的参数创建 X 个子流,然后我必须将这些子流合并到一个流中。所以,我知道 map
运算符是并行的 运行ning 。是吗?
如果是这样,为什么我可以看到同一个线程在这段代码中执行 map
运算符:
val textSource = Source(List(
"I love Akka streams", // odd
"this has even characters", // even
"this is amazing", // odd
"learning Akka at the Rock the JVM", // odd
"Let's rock the JVM", // even
"123", // odd
"1234" // even
))
val totalCharCountFuture = textSource
.groupBy(2, string => string.length % 2)
.map { c =>
println(s"I am running on thread [${Thread.currentThread().getId}]")
c.length
}// .async // this operator runs in parallel
.mergeSubstreamsWithParallelism(2)
.toMat(Sink.reduce[Int](_ + _))(Keep.right)
.run()
totalCharCountFuture.onComplete {
case Success(value) => println(s"total char count: $value")
case Failure(exception) => println(s"failed computation: $exception")
}
输出:
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
total char count: 116
然后我添加了 .async
以使运算符 运行 异步进入。然后我的输出显示执行 map
运算符的不同线程:
I am running on thread [21]
I am running on thread [21]
I am running on thread [21]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]
我阅读了 Akka 文档中关于异步边界的文档:
Put an asynchronous boundary around this Flow. If this is a SubFlow (created e.g. by groupBy), this creates an asynchronous boundary around each materialized sub-flow, not the super-flow. That way, the super-flow will communicate with sub-flows asynchronously.
所以,我是否需要在 groupBy
之后添加 .async
来确保所有子流是否并行执行?我正在做的这个测试是否验证了 Akka 流中运算符的并行性?
谢谢
So, do I need the .async after the groupBy to ensure that all substreams are executing in parallel or not? Is this test that I am doing validity the parallelism of an operator in Akka stream?
简短的回答是“是”,您需要 async
。
作为 Akka Streams(以及其他响应式流规范实现,如 RxJava 或 Project Reactor)的经验法则,您需要明确划分异步边界。默认情况下,流是单线程执行的(或者在 Akka Streams 的情况下是单角色)。这包括像 groupBy
这样的运算符。起初这似乎有点违反直觉,但当你考虑它时,并行执行在 groupBy
语义中并不是真正必须的,尽管你经常想要并行执行,因为这是你应用 groupBy
的真正原因],无论是使用所有可用于某些计算任务的内核,还是可能并行调用某些外部服务并获得更好的吞吐量。在这些情况下,您需要显式编码以实现并行性。一种方法是像您在示例中所做的那样使用 async
,其中流执行实现逻辑将引入并行性,或者您也可以使用 mapAsync
,其中并行性是通过流执行逻辑外部的某种方式引入的.