如何背压 ActorPublisher

How to backpressure a ActorPublisher

我正在编写一些示例来了解 akka 流和背压。我正在尝试查看缓慢的消费者背压如何影响 AkkaPublisher

我的代码如下

class DataPublisher extends ActorPublisher[Int] {

  import akka.stream.actor.ActorPublisherMessage._

  var items: List[Int] = List.empty

  def receive = {
    case s: String =>
      println(s"Producer buffer size ${items.size}")
      if (totalDemand == 0)
        items = items :+ s.toInt
      else
        onNext(s.toInt)

    case Request(demand) =>
      if (demand > items.size) {
        items foreach (onNext)
        items = List.empty
      }
      else {
        val (send, keep) = items.splitAt(demand.toInt)
        items = keep
        send foreach (onNext)
      }


    case other =>
      println(s"got other $other")
  }
}

Source.fromPublisher(ActorPublisher[Int](dataPublisherRef)).runWith(sink)

其中接收器是一个订阅者,它正在休眠以模拟慢速消费者。无论如何,出版商都会继续生产数据。

--编辑-- 我的问题是当需求为 0 时以编程方式缓冲数据。我如何利用背压来减慢发布者的速度

类似

throttledSource().buffer(10, OverflowStrategy.backpressure).runWith(throttledSink())

这不会影响发布者,它的缓冲区会继续运行。

谢谢, 萨吉特

这可以通过一个中间过程来完成 Flow.buffer:

val flowBuffer = Flow[Int].buffer(10, OverflowStrategy.dropHead)

Source
  .fromPublisher(ActorPublisher[Int](dataPublisherRef))
  .via(flowBuffer)
  .runWith(sink)

不要使用 ActorPublisher

首先,不要使用 ActorPublisher - 它是一个非常低级的 已弃用 API。我们决定弃用,因为用户不应该在 Akka Streams 中处理如此低的抽象级别。

其中一件棘手的事情正是您要问的——处理背压完全掌握在编写 ActorPublisher 的开发人员手中,如果他们使用此 API.因此,您必须接收 Request(n) 消息,并确保您 永远不会 发出比您收到的请求更多的元素信号。此行为在 Reactive Streams Specification 中指定,您必须正确实施。基本上,您会接触到 Reactive Streams 的所有复杂性(这是一个完整的规范,有许多边缘案例——免责声明:我 was/am 开发 Reactive Streams 和 Akka Streams 的一部分)。

显示背压在 GraphStage 中的表现

其次,要构建自定义阶段,您应该使用专为它设计的 API:GraphStage。请注意,这样的阶段也很低。通常 Akka Streams 的用户不需要编写自定义阶段,但是如果他们要实现一些内置阶段不提供的逻辑,那么编写自己的阶段是绝对期望的并且很好。

这是来自 Akka 代码库的简化过滤器实现:


case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
  override def initialAttributes: Attributes = DefaultAttributes.filter

  override def toString: String = "Filter"

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {

    override def onPush(): Unit = {
      val elem = grab(in)
      if (p(elem)) push(out, elem)
      else pull(in)
    }

    // this method will NOT be called, if the downstream has not signalled enough demand!
    // this method being NOT called is how back-pressure manifests in stages
    override def onPull(): Unit = pull(in)

    setHandlers(in, out, this)
  }
}

如您所见,您无需自己实现整个 Reactive Streams 逻辑和规则(这很难),您会得到像 onPushonPull 这样的简单回调。 Akka Streams 处理需求管理,如果下游发出需求信号,它会自动调用 onPull,如果没有需求,它不会调用它——这意味着下游正在向这个阶段施加背压。