Akka 流图并行化
Akka Stream Graph parallelisation
我创建了一个 Graph
,其中包含一个 Balance
。这 Balance
将负载分配给 5 Flows
。我所期望的是我的 Flow
的每个实例都会在单独的 Thread
上 运行。然而,事实并非如此。
当我打印 Thread
名称时,我注意到所有 Flows
都在同一个 Thread
.
上执行
我使用的代码是:
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(1 to 10)
val out = Sink.ignore
val bal = builder.add(Balance[Int](5))
val merge = builder.add(Merge[Int](5))
val f1, f2, f3, f4, f5 = Flow[Int].map(x => {
println(Thread.currentThread())
x
}).async
in ~> bal ~> f1 ~> merge ~> out
bal ~> f2 ~> merge
bal ~> f3 ~> merge
bal ~> f4 ~> merge
bal ~> f5 ~> merge
ClosedShape
})
这输出:
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
我的期望是输出类似于:
Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
如何更改此代码示例以便 Flows
并行执行?
async 指令不保证您的阶段将在单独的线程中执行。只要阶段不在时间上重叠,它们可能 运行 在同一个线程上。
对于您的具体情况,执行的步骤可能如下:
- merge 在第一个入口请求一个元素
- 余额通过第一个流程服务元素
- merge 在第二个入口请求一个元素
- 平衡通过第二个流程服务元素
- 等等
现在,如果您按如下方式更改余额
val bal = builder.add(Balance[Int](5, waitForAllDownstreams = true))
您将强制生成 5 个线程,步骤如下
- 合并在第一个入口请求一个元素
- merge 在第二个入口请求一个元素
- 合并请求第 3 个入口的元素
- 合并请求第 4 个入口的元素
- 合并请求第 5 个入口的元素
- balance 开始通过所有流为元素提供服务
我创建了一个 Graph
,其中包含一个 Balance
。这 Balance
将负载分配给 5 Flows
。我所期望的是我的 Flow
的每个实例都会在单独的 Thread
上 运行。然而,事实并非如此。
当我打印 Thread
名称时,我注意到所有 Flows
都在同一个 Thread
.
我使用的代码是:
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(1 to 10)
val out = Sink.ignore
val bal = builder.add(Balance[Int](5))
val merge = builder.add(Merge[Int](5))
val f1, f2, f3, f4, f5 = Flow[Int].map(x => {
println(Thread.currentThread())
x
}).async
in ~> bal ~> f1 ~> merge ~> out
bal ~> f2 ~> merge
bal ~> f3 ~> merge
bal ~> f4 ~> merge
bal ~> f5 ~> merge
ClosedShape
})
这输出:
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
我的期望是输出类似于:
Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]
Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]
如何更改此代码示例以便 Flows
并行执行?
async 指令不保证您的阶段将在单独的线程中执行。只要阶段不在时间上重叠,它们可能 运行 在同一个线程上。
对于您的具体情况,执行的步骤可能如下:
- merge 在第一个入口请求一个元素
- 余额通过第一个流程服务元素
- merge 在第二个入口请求一个元素
- 平衡通过第二个流程服务元素
- 等等
现在,如果您按如下方式更改余额
val bal = builder.add(Balance[Int](5, waitForAllDownstreams = true))
您将强制生成 5 个线程,步骤如下
- 合并在第一个入口请求一个元素
- merge 在第二个入口请求一个元素
- 合并请求第 3 个入口的元素
- 合并请求第 4 个入口的元素
- 合并请求第 5 个入口的元素
- balance 开始通过所有流为元素提供服务