像流并行

Akka Stream Parallelism

根据文档[1],我一直在尝试并行化 Akka Stream 中的流,但由于某种原因,我没有得到预期的结果。

我按照文档中概述的步骤进行操作,我认为我没有遗漏任何内容。然而,我的流的计算都是按顺序一个接一个地发生的。

我在这里错过了什么?

[1] https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html

import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Source}

object ScalaParallell extends App {

  implicit val system = ActorSystem("QuickStart")

  def longRunningComputation(x: Int): Int = {
    println(s"Computing 1 ${x}")
    Thread.sleep(10000)
    println(s"Computation 1 ${x} done")
    x
  }
  def longRunningComputation2(x: Int): Int = {
    println(s"Computing 2 ${x}")
    Thread.sleep(10000)
    println(s"Computation 2 ${x} done")
    x
  }

  val processor: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      // prepare graph elements
      val balance = b.add(Balance[Int](2))
      val merge = b.add(Merge[Int](2))
      val f = Flow[Int].map(longRunningComputation)
      val f2 = Flow[Int].map(longRunningComputation2)


      // connect the graph
      balance.out(0) ~> f.async ~> merge.in(0)
      balance.out(1) ~> f2.async ~> merge.in(1)

      // expose ports
      FlowShape(balance.in, merge.out)
    })


  // Wire it all up.
  val xs = List(1,2,3)
  val source: Source[Int, NotUsed] = Source(xs)
  source.via(processor).runForeach(println)


  Thread.sleep(5000)
}

示例输出

Computing 2 1
Computation 2 1 done
Computing 2 2
1
Computation 2 2 done
Computing 2 3
2
Computation 2 3 done
3

我希望看到两个计算同时发生。例如:

Computing 1 1
Computing 1 2
Computation 1 2 done
Computing 1 3
Computation 1 1 done
Computing 2 4
1
2
..

尝试删除 longRunningComputationlongRunningComputation2 中的 Thread.sleep 并将 xs 设置为更长的内容,例如 1 to 100,然后您将能够观察并行处理。不知道为什么,但是阻塞 Thread.sleep 在 akka

中绝对被认为是反模式