如何背压 ActorPublisher
How to backpressure a ActorPublisher
我正在编写一些示例来了解 akka 流和背压。我正在尝试查看缓慢的消费者背压如何影响 AkkaPublisher
我的代码如下
class DataPublisher extends ActorPublisher[Int] {
import akka.stream.actor.ActorPublisherMessage._
var items: List[Int] = List.empty
def receive = {
case s: String =>
println(s"Producer buffer size ${items.size}")
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)
case Request(demand) =>
if (demand > items.size) {
items foreach (onNext)
items = List.empty
}
else {
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}
case other =>
println(s"got other $other")
}
}
和
Source.fromPublisher(ActorPublisher[Int](dataPublisherRef)).runWith(sink)
其中接收器是一个订阅者,它正在休眠以模拟慢速消费者。无论如何,出版商都会继续生产数据。
--编辑--
我的问题是当需求为 0 时以编程方式缓冲数据。我如何利用背压来减慢发布者的速度
类似
throttledSource().buffer(10, OverflowStrategy.backpressure).runWith(throttledSink())
这不会影响发布者,它的缓冲区会继续运行。
谢谢,
萨吉特
这可以通过一个中间过程来完成 Flow.buffer
:
val flowBuffer = Flow[Int].buffer(10, OverflowStrategy.dropHead)
Source
.fromPublisher(ActorPublisher[Int](dataPublisherRef))
.via(flowBuffer)
.runWith(sink)
不要使用 ActorPublisher
首先,不要使用 ActorPublisher
- 它是一个非常低级的 已弃用 API。我们决定弃用,因为用户不应该在 Akka Streams 中处理如此低的抽象级别。
其中一件棘手的事情正是您要问的——处理背压完全掌握在编写 ActorPublisher 的开发人员手中,如果他们使用此 API.因此,您必须接收 Request(n)
消息,并确保您 永远不会 发出比您收到的请求更多的元素信号。此行为在 Reactive Streams Specification 中指定,您必须正确实施。基本上,您会接触到 Reactive Streams 的所有复杂性(这是一个完整的规范,有许多边缘案例——免责声明:我 was/am 开发 Reactive Streams 和 Akka Streams 的一部分)。
显示背压在 GraphStage 中的表现
其次,要构建自定义阶段,您应该使用专为它设计的 API:GraphStage
。请注意,这样的阶段也很低。通常 Akka Streams 的用户不需要编写自定义阶段,但是如果他们要实现一些内置阶段不提供的逻辑,那么编写自己的阶段是绝对期望的并且很好。
这是来自 Akka 代码库的简化过滤器实现:
case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.filter
override def toString: String = "Filter"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
// this method will NOT be called, if the downstream has not signalled enough demand!
// this method being NOT called is how back-pressure manifests in stages
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
如您所见,您无需自己实现整个 Reactive Streams 逻辑和规则(这很难),您会得到像 onPush
和 onPull
这样的简单回调。 Akka Streams 处理需求管理,如果下游发出需求信号,它会自动调用 onPull
,如果没有需求,它不会调用它——这意味着下游正在向这个阶段施加背压。
我正在编写一些示例来了解 akka 流和背压。我正在尝试查看缓慢的消费者背压如何影响 AkkaPublisher
我的代码如下
class DataPublisher extends ActorPublisher[Int] {
import akka.stream.actor.ActorPublisherMessage._
var items: List[Int] = List.empty
def receive = {
case s: String =>
println(s"Producer buffer size ${items.size}")
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)
case Request(demand) =>
if (demand > items.size) {
items foreach (onNext)
items = List.empty
}
else {
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}
case other =>
println(s"got other $other")
}
}
和
Source.fromPublisher(ActorPublisher[Int](dataPublisherRef)).runWith(sink)
其中接收器是一个订阅者,它正在休眠以模拟慢速消费者。无论如何,出版商都会继续生产数据。
--编辑-- 我的问题是当需求为 0 时以编程方式缓冲数据。我如何利用背压来减慢发布者的速度
类似
throttledSource().buffer(10, OverflowStrategy.backpressure).runWith(throttledSink())
这不会影响发布者,它的缓冲区会继续运行。
谢谢, 萨吉特
这可以通过一个中间过程来完成 Flow.buffer
:
val flowBuffer = Flow[Int].buffer(10, OverflowStrategy.dropHead)
Source
.fromPublisher(ActorPublisher[Int](dataPublisherRef))
.via(flowBuffer)
.runWith(sink)
不要使用 ActorPublisher
首先,不要使用 ActorPublisher
- 它是一个非常低级的 已弃用 API。我们决定弃用,因为用户不应该在 Akka Streams 中处理如此低的抽象级别。
其中一件棘手的事情正是您要问的——处理背压完全掌握在编写 ActorPublisher 的开发人员手中,如果他们使用此 API.因此,您必须接收 Request(n)
消息,并确保您 永远不会 发出比您收到的请求更多的元素信号。此行为在 Reactive Streams Specification 中指定,您必须正确实施。基本上,您会接触到 Reactive Streams 的所有复杂性(这是一个完整的规范,有许多边缘案例——免责声明:我 was/am 开发 Reactive Streams 和 Akka Streams 的一部分)。
显示背压在 GraphStage 中的表现
其次,要构建自定义阶段,您应该使用专为它设计的 API:GraphStage
。请注意,这样的阶段也很低。通常 Akka Streams 的用户不需要编写自定义阶段,但是如果他们要实现一些内置阶段不提供的逻辑,那么编写自己的阶段是绝对期望的并且很好。
这是来自 Akka 代码库的简化过滤器实现:
case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.filter
override def toString: String = "Filter"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
// this method will NOT be called, if the downstream has not signalled enough demand!
// this method being NOT called is how back-pressure manifests in stages
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
如您所见,您无需自己实现整个 Reactive Streams 逻辑和规则(这很难),您会得到像 onPush
和 onPull
这样的简单回调。 Akka Streams 处理需求管理,如果下游发出需求信号,它会自动调用 onPull
,如果没有需求,它不会调用它——这意味着下游正在向这个阶段施加背压。