Akka Stream 阶段不同时执行
Akka Stream stages not executing concurrently
我开始学习 Akka Streams 并且 运行 从这里开始第一个例子:
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-rate.html#stream-rate-scala
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
object Main extends App {
implicit val system = ActorSystem("TestSystem")
implicit val mat = ActorMaterializer()
Source(1 to 10)
.map { i => println(s"A: $i"); i }
.map { i => println(s"B: $i"); i }
.map { i => println(s"C: $i"); i }
.runWith(Sink.ignore)
}
根据示例,输出应该是不确定的,如下所示:
A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3
但是当我 运行 它时,它永远不会开始处理下一个元素,直到前一个元素被完全处理。
A: 1
B: 1
C: 1
A: 2
B: 2
C: 2
A: 3
B: 3
C: 3
A: 4
B: 4
C: 4
A: 5
B: 5
C: 5
A: 6
B: 6
C: 6
A: 7
B: 7
C: 7
A: 8
B: 8
C: 8
A: 9
B: 9
C: 9
A: 10
B: 10
C: 10
我也试过在每个stage/map中添加延迟(Thread.sleep),一次只处理一件事,就好像actor系统只有一个线程一样。我能够确认 Akka 调度程序有足够的线程。
import system.dispatcher
val start = System.currentTimeMillis()
( 1 to 10 ).map { i => Future { Thread.sleep( 1000 ); println( s"Finished ${i} after ${System.currentTimeMillis() - start}ms" ) }
输出:
Finished 5 after 1004ms
Finished 2 after 1004ms
Finished 6 after 1005ms
Finished 8 after 1004ms
Finished 4 after 1004ms
Finished 9 after 1005ms
Finished 7 after 1004ms
Finished 3 after 1005ms
Finished 1 after 1006ms
Finished 10 after 1009ms
是否需要进行一些调整才能让各个阶段同时处理?
我正在使用 Akka Streams 2.4.2 和 Java 1.8.0_65-b17。
我认为,您正在观察 Operator Fusion,这意味着所有三个 map
操作都在同一个 Actor 上执行。
可融合元素例如:
- 所有 GraphStage(这包括除 groupBy 之外的所有内置连接)
- 所有阶段(包括所有内置线性运算符)
- TCP 连接数
您可以通过传递完全禁用它的配置参数 akka.stream.materializer.auto-fusing=off
来禁用它(我不确定它通常是个好主意)或者您可以通过添加异步边界在代码中禁用它(有关更多详细信息,请参阅我所附的 link)。单个异步边界中的所有内容都在单个参与者上执行。
Source(1 to 10)
.map { i => println(s"A: $i"); i }.async
.map { i => println(s"B: $i"); i }.async
.map { i => println(s"C: $i"); i }.async
.runWith(Sink.ignore)
我开始学习 Akka Streams 并且 运行 从这里开始第一个例子:
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-rate.html#stream-rate-scala
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
object Main extends App {
implicit val system = ActorSystem("TestSystem")
implicit val mat = ActorMaterializer()
Source(1 to 10)
.map { i => println(s"A: $i"); i }
.map { i => println(s"B: $i"); i }
.map { i => println(s"C: $i"); i }
.runWith(Sink.ignore)
}
根据示例,输出应该是不确定的,如下所示:
A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3
但是当我 运行 它时,它永远不会开始处理下一个元素,直到前一个元素被完全处理。
A: 1
B: 1
C: 1
A: 2
B: 2
C: 2
A: 3
B: 3
C: 3
A: 4
B: 4
C: 4
A: 5
B: 5
C: 5
A: 6
B: 6
C: 6
A: 7
B: 7
C: 7
A: 8
B: 8
C: 8
A: 9
B: 9
C: 9
A: 10
B: 10
C: 10
我也试过在每个stage/map中添加延迟(Thread.sleep),一次只处理一件事,就好像actor系统只有一个线程一样。我能够确认 Akka 调度程序有足够的线程。
import system.dispatcher
val start = System.currentTimeMillis()
( 1 to 10 ).map { i => Future { Thread.sleep( 1000 ); println( s"Finished ${i} after ${System.currentTimeMillis() - start}ms" ) }
输出:
Finished 5 after 1004ms
Finished 2 after 1004ms
Finished 6 after 1005ms
Finished 8 after 1004ms
Finished 4 after 1004ms
Finished 9 after 1005ms
Finished 7 after 1004ms
Finished 3 after 1005ms
Finished 1 after 1006ms
Finished 10 after 1009ms
是否需要进行一些调整才能让各个阶段同时处理?
我正在使用 Akka Streams 2.4.2 和 Java 1.8.0_65-b17。
我认为,您正在观察 Operator Fusion,这意味着所有三个 map
操作都在同一个 Actor 上执行。
可融合元素例如:
- 所有 GraphStage(这包括除 groupBy 之外的所有内置连接)
- 所有阶段(包括所有内置线性运算符)
- TCP 连接数
您可以通过传递完全禁用它的配置参数 akka.stream.materializer.auto-fusing=off
来禁用它(我不确定它通常是个好主意)或者您可以通过添加异步边界在代码中禁用它(有关更多详细信息,请参阅我所附的 link)。单个异步边界中的所有内容都在单个参与者上执行。
Source(1 to 10)
.map { i => println(s"A: $i"); i }.async
.map { i => println(s"B: $i"); i }.async
.map { i => println(s"C: $i"); i }.async
.runWith(Sink.ignore)