使用 Akka Streams 的工人池

Pool of workers with Akka Streams

akka streams documentation中所述,我尝试创建一个工作池(流):

def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b =>
      val balancer = b.add(Balance[In](workerCount))
      val merge = b.add(Merge[Out](workerCount))

      for (_ <- 1 to workerCount) {
        balancer ~> worker ~> merge
      }
      FlowShape(balancer.in, merge.out)
    })
  }

然后我用这个函数运行一个并行流:

def main(args: Array[String]) {
    val system = ActorSystem()
    implicit val mat = ActorMaterializer.create(system)

    val flow = Flow[Int].map(e => {
      println(e)
      Thread.sleep(1000) // 1 second
      e
    })

    Source(Range.apply(1, 10).toList)
      .via(balancer(flow, 3))
      .runForeach(e => {})
  }

我得到了预期的输出 1, 2, 3, 4, 5, 6, 7, 8, 9 但数字以每秒 1 个的速率出现(无并行性)。我做错了什么?

该部分中的文档已过时,将在下一版本中修复。基本上您只需要在流本身上调用 .async 即可。通过这样做,您可以在流周围画一个 "box"(您可以将其想象为一个具有一个输入和输出端口的盒子),这将防止在该盒子上融合。通过这样做,基本上所有的工人都将在专门的演员身上。图的其余部分(广播和合并阶段)将共享另一个参与者(它们不会 运行 在单独的参与者上,异步框仅保护流,外部的东西仍然会被融合)。

正如 Endre Varga 指出的那样,流程本身应该用 .async 标记。

但即便如此,行为也不是确定性的,因为异步阶段的默认缓冲区大小为 16,并且平衡器可能会将所有消息发送给同一个工作人员。

因此,balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge 会导致预期的行为。

项目成员给出的答案见: https://github.com/akka/akka/issues/20146#issuecomment-201381356