Akka 流图并行化

Akka Stream Graph parallelisation

我创建了一个 Graph,其中包含一个 Balance。这 Balance 将负载分配给 5 Flows。我所期望的是我的 Flow 的每个实例都会在单独的 Thread 上 运行。然而,事实并非如此。 当我打印 Thread 名称时,我注意到所有 Flows 都在同一个 Thread.

上执行

我使用的代码是:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(1 to 10)
  val out = Sink.ignore

  val bal = builder.add(Balance[Int](5))
  val merge = builder.add(Merge[Int](5))

  val f1, f2, f3, f4, f5 = Flow[Int].map(x => {
    println(Thread.currentThread())
    x
  }).async

  in ~> bal ~> f1 ~> merge ~> out
  bal ~> f2 ~> merge
  bal ~> f3 ~> merge
  bal ~> f4 ~> merge
  bal ~> f5 ~> merge

  ClosedShape
})

这输出:

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

我的期望是输出类似于:

Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

如何更改此代码示例以便 Flows 并行执行?

async 指令不保证您的阶段将在单独的线程中执行。只要阶段不在时间上重叠,它们可能 运行 在同一个线程上。

对于您的具体情况,执行的步骤可能如下:

  • merge 在第一个入口请求一个元素
  • 余额通过第一个流程服务元素
  • merge 在第二个入口请求一个元素
  • 平衡通过第二个流程服务元素
  • 等等

现在,如果您按如下方式更改余额

val bal = builder.add(Balance[Int](5, waitForAllDownstreams = true))

您将强制生成 5 个线程,步骤如下

  • 合并在第一个入口请求一个元素
  • merge 在第二个入口请求一个元素
  • 合并请求第 3 个入口的元素
  • 合并请求第 4 个入口的元素
  • 合并请求第 5 个入口的元素
  • balance 开始通过所有流为元素提供服务