重现 akka-stream 异步输出

Reproduce akka-stream async output

我是 akka-stream 的新手,所以想问一下如何重现本文中出现的行为 http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/stream-rate.html

对于给定的代码

Source(1 to 3)
  .map { i => println(s"A: $i"); i }
  .map { i => println(s"B: $i"); i }
  .map { i => println(s"C: $i"); i }
  .runWith(Sink.ignore)

得到这样的相似

A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3

我试过随机添加一些 Thread.sleep, 从无限迭代器创建流。 但是 Akka 相应地调试输出总是使用相同的线程进行处理。

所以问题是:如何使用 akka-stream 重现异步行为(每个阶段都应该 运行 以异步方式)?

您看到顺序操作的原因是因为您的所有操作都来自同一个源,因此在同一个异步边界内。要获得您正在寻找的 "async behavior",您需要添加 Flows:

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

Source(1 to 3).via(Flow[Int].map{i => println(s"A: $i"); i })
              .via(Flow[Int].map{i => println(s"B: $i"); i })
              .via(Flow[Int].map{i => println(s"C: $i"); i })
              .runWith(Sink.ignore)

每个 Flow 都将具体化为一个单独的 Actor。注意:要获得真正的并发性,ActorSystem 正在运行的线程池必须有 1 个以上的线程。

要记住一件事:ActorSystem 的好处是它承担了低级别操作控制的责任,以便开发人员可以专注于 "business logic"。这也可能是一个缺点。根据您的 ActorSystem 配置、JVM 配置和硬件配置,操作顺序可能仍然是同步的。