检测背压正在发生

Detecting that back pressure is happening

我的 Akka HTTP 应用程序通过服务器发送的事件流式传输一些数据,客户端可以请求比它们能够处理的更多的事件。代码看起来像这样

complete {
  source.filter(predicate.isMatch)
   .buffer(1000, OverflowStrategy.dropTail)
   .throttle(20, 1 second)
   .map { evt => ServerSentEvent(evt) }
}

有没有一种方法可以检测阶段背压并以某种方式通知客户端最好使用相同的接收器(通过发出不同类型的输出)或者如果不可能就让 Akka 框架调用某种回调将通过控制侧通道处理事实?

所以,我不确定我是否理解您的用例。您是在询问 .buffer 还是 .throttle 处的背压?我感到困惑的另一部分是,您建议在流已经背压的情况下发出一个新的 "control" 元素。所以你的控制元素可能有一段时间没有收到。此外,如果您每次收到背压时都发出一个控制元素,您可能会产生大量控制元素。

构建此(过于天真的)解决方案的一种方法是使用 conflate

  val simpleSink: Sink[String, Future[Done]] =
    Sink.foreach(e => println(s"simple: $e"))

  val cycleSource: Source[String, NotUsed] =
    Source.cycle(() => List("1", "2", "3", "4").iterator).throttle(5, 1.second)

  val conflateFlow: Flow[String, String, NotUsed] =
    Flow[String].conflate((a, b) => {
      "BACKPRESSURE CONTROL ELEMENT"
    })

  val backpressureFlow: Flow[String, String, NotUsed] =
    Flow[String]
      .buffer(10, OverflowStrategy.backpressure) throttle (2, 1.second)

  val backpressureTest =
    cycleSource.via(conflateFlow).via(backpressureFlow).to(simpleSink).run()

要将其变成更有用的示例,您可以:

  1. .conflate 中进行某种调用(然后只删除其中一个元素)。但是要小心不要做任何阻塞。也许只是发送一条可以在其他地方删除重复数据的消息。

  2. 写一个custom graph stage。像这样简单的事情做起来不会太难。

我想我必须了解更多关于用例的信息。查看所有现成的 backpressure aware operators,看看其中一个是否有帮助。