Akka-streams 使用异步处理对广播进行背压

Akka-streams backpressure on broadcast with async processing

我正在努力理解 akka-stream 是否对 Source 施加了背压,当一个分支在图表中花费大量时间(异步)进行广播时。

我尝试了 bufferbatch 以查看是否对源应用了任何背压,但看起来不像。我也试过冲洗 System.out 但它没有改变任何东西。

object Test extends App {
/* Necessary for akka stream */
implicit val system = ActorSystem("test")
implicit val materializer: ActorMaterializer = ActorMaterializer()

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val in = Source.tick(0 seconds, 1 seconds, 1)
        in.runForeach(i => println("Produced " + i))

    val out = Sink.foreach(println)
    val out2 = Sink.foreach[Int]{ o => println(s"2 $o") }

    val bcast = builder.add(Broadcast[Int](2))

    val batchedIn: Source[Int, Cancellable] = in.batch(4, identity) {
        case (s, v) => println(s"Batched ${s+v}"); s + v
    }

    val f2 = Flow[Int].map(_ + 10)
    val f4 = Flow[Int].map { i => Thread.sleep(2000); i}

    batchedIn ~> bcast ~> f2 ~> out
                 bcast ~> f4.async ~> out2
    ClosedShape
})

g.run()
}

当我 运行 程序时,我希望在控制台中看到 "Batched ...",并且在某些时候它会暂时卡住,因为 f4 的速度不够快,无法处理这些值。目前,none 的行为符合预期,因为数字是连续生成的,没有批处理完成。

编辑: 我注意到一段时间后,批处理消息开始在控制台中打印出来。我仍然不知道为什么它不尽快发生,因为第一个元素应该发生背压

解释此行为的原因是设置异步边界时 akka 引入的内部缓冲区。

Buffers for asynchronous operators

internal buffers that are introduced as an optimization when using asynchronous operators.


While pipelining in general increases throughput, in practice there is a cost of passing an element through the asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses a windowed, batching backpressure strategy internally. It is windowed because as opposed to a Stop-And-Wait protocol multiple elements might be “in-flight” concurrently with requests for elements. It is also batching because a new element is not immediately requested once an element has been drained from the window-buffer but multiple elements are requested after multiple elements have been drained. This batching strategy reduces the communication cost of propagating the backpressure signal through the asynchronous boundary.

我知道这是一个玩具流,但如果你解释你的目标是什么,我会尽力帮助你。

您需要 mapAsync 而不是 async

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import akka.stream.scaladsl.GraphDSL.Implicits._

  val in = Source.tick(0 seconds, 1 seconds, 1).map(x => {println(s"Produced ${x}"); x})

  val out = Sink.foreach[Int]{ o => println(s"F2 processed $o") }
  val out2 = Sink.foreach[Int]{ o => println(s"F4 processed $o") }

  val bcast = builder.add(Broadcast[Int](2))

  val batchedIn: Source[Int, Cancellable] = in.buffer(4,OverflowStrategy.backpressure)

  val f2 = Flow[Int].map(_ + 10)
  val f4 = Flow[Int].mapAsync(1) { i => Future { println("F4 Started Processing"); Thread.sleep(2000); i }(system.dispatcher) }

  batchedIn ~> bcast ~> f2 ~> out
  bcast ~> f4 ~> out2
  ClosedShape
}).run()