重现 akka-stream 异步输出
Reproduce akka-stream async output
我是 akka-stream 的新手,所以想问一下如何重现本文中出现的行为 http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/stream-rate.html
对于给定的代码
Source(1 to 3)
.map { i => println(s"A: $i"); i }
.map { i => println(s"B: $i"); i }
.map { i => println(s"C: $i"); i }
.runWith(Sink.ignore)
得到这样的相似
A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3
我试过随机添加一些 Thread.sleep
,
从无限迭代器创建流。
但是 Akka 相应地调试输出总是使用相同的线程进行处理。
所以问题是:如何使用 akka-stream 重现异步行为(每个阶段都应该 运行 以异步方式)?
您看到顺序操作的原因是因为您的所有操作都来自同一个源,因此在同一个异步边界内。要获得您正在寻找的 "async behavior",您需要添加 Flows:
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
Source(1 to 3).via(Flow[Int].map{i => println(s"A: $i"); i })
.via(Flow[Int].map{i => println(s"B: $i"); i })
.via(Flow[Int].map{i => println(s"C: $i"); i })
.runWith(Sink.ignore)
每个 Flow 都将具体化为一个单独的 Actor。注意:要获得真正的并发性,ActorSystem
正在运行的线程池必须有 1 个以上的线程。
要记住一件事:ActorSystem 的好处是它承担了低级别操作控制的责任,以便开发人员可以专注于 "business logic"。这也可能是一个缺点。根据您的 ActorSystem 配置、JVM 配置和硬件配置,操作顺序可能仍然是同步的。
我是 akka-stream 的新手,所以想问一下如何重现本文中出现的行为 http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/stream-rate.html
对于给定的代码
Source(1 to 3)
.map { i => println(s"A: $i"); i }
.map { i => println(s"B: $i"); i }
.map { i => println(s"C: $i"); i }
.runWith(Sink.ignore)
得到这样的相似
A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3
我试过随机添加一些 Thread.sleep
,
从无限迭代器创建流。
但是 Akka 相应地调试输出总是使用相同的线程进行处理。
所以问题是:如何使用 akka-stream 重现异步行为(每个阶段都应该 运行 以异步方式)?
您看到顺序操作的原因是因为您的所有操作都来自同一个源,因此在同一个异步边界内。要获得您正在寻找的 "async behavior",您需要添加 Flows:
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
Source(1 to 3).via(Flow[Int].map{i => println(s"A: $i"); i })
.via(Flow[Int].map{i => println(s"B: $i"); i })
.via(Flow[Int].map{i => println(s"C: $i"); i })
.runWith(Sink.ignore)
每个 Flow 都将具体化为一个单独的 Actor。注意:要获得真正的并发性,ActorSystem
正在运行的线程池必须有 1 个以上的线程。
要记住一件事:ActorSystem 的好处是它承担了低级别操作控制的责任,以便开发人员可以专注于 "business logic"。这也可能是一个缺点。根据您的 ActorSystem 配置、JVM 配置和硬件配置,操作顺序可能仍然是同步的。