Akka Stream Source.queue 的背压策略不起作用

Backpressure strategies for Akka Stream Source.queue not working

我试图理解为什么下面的代码片段正在做它正在做的事情。我本以为,因为 Sink 产生需求的速度不能快于 Source 产生内容的速度,所以我会收到丢弃的消息以响应某些优惠(溢出策略设置为 Drop Buffer)以及错误和队列关闭消息自毁片后。

片段:

package playground

import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.duration._

case object MessageToSink

object Playground extends App {

  implicit val actorSystem = ActorSystem("Playground")
  implicit val execCntxt = actorSystem.dispatcher

  val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder])
  actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink)

  println(s"Playground has started... ${LocalDateTime.now()}")
}

class Actor2SinkFwder extends Actor with ActorLogging {

  implicit val materializer = ActorMaterializer()
  implicit val execCtxt = context.dispatcher

  val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
    .to(Sink.foreach[Int] {
      i =>
        println(s"$i Sinking starts at ${LocalDateTime.now()}")
        Thread.sleep(150)
        if (i == 5) throw new RuntimeException("KaBoom!")
        println(s"$i Sinking completes at ${LocalDateTime.now()}")
    }).run()

  val i: AtomicInteger = new AtomicInteger(0)

  override def receive: Receive = {
    case MessageToSink =>
      val num = i.incrementAndGet()
      println(s"$num Sink Command received at ${LocalDateTime.now()}")
      flow.offer(num).collect {
        case Enqueued => println(s"$num Enqueued ${LocalDateTime.now}")
        case Dropped => println(s"$num Dropped ${LocalDateTime.now}")
        case Failure(err) => println(s"$num Failed ${LocalDateTime.now} $err")
        case QueueClosed => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
      }
   }
}

输出:

Playground has started... 2016-12-27T18:35:29.574
1 Sink Command received at 2016-12-27T18:35:29.640
2 Sink Command received at 2016-12-27T18:35:29.642
3 Sink Command received at 2016-12-27T18:35:29.642
1 Sinking starts at 2016-12-27T18:35:29.649
1 Enqueued 2016-12-27T18:35:29.650
4 Sink Command received at 2016-12-27T18:35:29.688
5 Sink Command received at 2016-12-27T18:35:29.738
6 Sink Command received at 2016-12-27T18:35:29.788
1 Sinking completes at 2016-12-27T18:35:29.799
2 Sinking starts at 2016-12-27T18:35:29.800
2 Enqueued 2016-12-27T18:35:29.800
7 Sink Command received at 2016-12-27T18:35:29.838
8 Sink Command received at 2016-12-27T18:35:29.888
9 Sink Command received at 2016-12-27T18:35:29.938
2 Sinking completes at 2016-12-27T18:35:29.950
3 Sinking starts at 2016-12-27T18:35:29.951
3 Enqueued 2016-12-27T18:35:29.951
10 Sink Command received at 2016-12-27T18:35:29.988
11 Sink Command received at 2016-12-27T18:35:30.038
12 Sink Command received at 2016-12-27T18:35:30.088
3 Sinking completes at 2016-12-27T18:35:30.101
4 Sinking starts at 2016-12-27T18:35:30.101
4 Enqueued 2016-12-27T18:35:30.101
13 Sink Command received at 2016-12-27T18:35:30.138
14 Sink Command received at 2016-12-27T18:35:30.189
15 Sink Command received at 2016-12-27T18:35:30.238
4 Sinking completes at 2016-12-27T18:35:30.251
5 Sinking starts at 2016-12-27T18:35:30.251
5 Enqueued 2016-12-27T18:35:30.252
16 Sink Command received at 2016-12-27T18:35:30.288
17 Sink Command received at 2016-12-27T18:35:30.338
18 Sink Command received at 2016-12-27T18:35:30.388
19 Sink Command received at 2016-12-27T18:35:30.438
20 Sink Command received at 2016-12-27T18:35:30.488
21 Sink Command received at 2016-12-27T18:35:30.538
22 Sink Command received at 2016-12-27T18:35:30.588
23 Sink Command received at 2016-12-27T18:35:30.638
24 Sink Command received at 2016-12-27T18:35:30.688
25 Sink Command received at 2016-12-27T18:35:30.738
26 Sink Command received at 2016-12-27T18:35:30.788
etc...

我认为我的误解是关于在 QueueSource class 中使用 getAsyncCallback。即使 QueueSource 中的 offer 调用使用正确的 Offer 详细信息调用 stageLogic,在前一个元素完成处理之前不会调用阶段逻辑中此代码的实际处理程序,因此用于检查缓冲区的逻辑 none正在应用大小或应用溢出策略... :-/

要查看您期望的结果,您应该在 SourceSink 之间添加一个 async 阶段。这是一种告诉 Akka 运行 使用两个不同的 Actor 的两个阶段的方法——通过强制两者之间的异步边界。

如果没有 async,Akka 将通过粉碎一个 actor 中的所有内容来优化执行,这将使处理顺序化。在您的示例中,正如您所注意到的,一条消息被 offer 编辑到队列中,直到上一条消息的 Thread.sleep(150) 完成。 可以找到有关该主题的更多信息 here

  val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
    .async
    .to(Sink.foreach[Int] {...}).run()

另外,在匹配.offer结果的时候,需要多加一个case。这是 FutureFailure,当下游队列失败时 Future 完成。这适用于前 5

之后 offered 的所有消息
  override def receive: Receive = {
    case MessageToSink =>
      val num = i.incrementAndGet()
      println(s"$num Sink Command received at ${LocalDateTime.now()}")
      flow.offer(num).onComplete {
        case Success(Enqueued) => println(s"$num Enqueued ${LocalDateTime.now}")
        case Success(Dropped) => println(s"$num Dropped ${LocalDateTime.now}")
        case Success(Failure(err)) => println(s"$num Failed ${LocalDateTime.now} $err")
        case Success(QueueClosed) => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
        case util.Failure(err) => println(s"$num Failed ${LocalDateTime.now} with exception $err")
      }
  }

请注意,即使执行了上述所有操作,您也不会看到任何 QueueOfferResult.Dropped 结果。那是因为你选择了DropBuffer策略。每个传入的消息都将排队(因此产生 Enqueued 消息),踢出现有缓冲区。如果将策略更改为 DropNew,您应该会开始看到一些 Dropped 消息。

我找到了我在评论中写的问题的答案,我认为它与原始问题非常相关,所以我想将其添加为答案(但正确答案是来自 stefano 的答案)。

导致此行为的元素是 缓冲区,但不是我们明确配置的缓冲区,例如 map.(...).buffer(1,OverflowStrategy.dropBuffer).async,而是构建于物化。此缓冲区专门为提高性能而实施,并且是在实现时执行的蓝图优化的一部分。

While pipelining in general increases throughput, in practice there is a cost of passing an element through the asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses a windowed, batching backpressure strategy internally. It is windowed because as opposed to a Stop-And-Wait protocol multiple elements might be “in-flight” concurrently with requests for elements. It is also batching because a new element is not immediately requested once an element has been drained from the window-buffer but multiple elements are requested after multiple elements have been drained. This batching strategy reduces the communication cost of propagating the backpressure signal through the asynchronous boundary.

关于 internal buffers is close to explicit buffers and are part of the "working with rate" 部分的文档并非偶然。

BatchingActorInputBoundary 具有 inputBuffer

  /* Bad: same number of emitted and consumed events, i.e. DOES NOT DROP
  Emitted: 1
  Emitted: 1
  Emitted: 1
  Consumed: 1
  Emitted: 1
  Emitted: 1
  Consumed: 1
  Consumed: 1
  Consumed: 1
  Consumed: 1
  */
  def example1() {
    val c = Source.tick(500 millis, 500 millis, 1)
      .map(x => {
        println("Emitted: " + x)
        x
      })
      .buffer(1, OverflowStrategy.dropBuffer).async
      .toMat(Sink.foreach[Int](x => {
        Thread.sleep(5000)
        println("Consumed: " + x)
      }))(Keep.left)
      .run
    Thread.sleep(3000)
    c.cancel()

}

上面导致意外(对我来说!)行为的示例可以是 "solved" 通过

减小内部缓冲区的大小
.toMat(Sink.foreach[Int](x => {
            Thread.sleep(5000)
            println("Consumed: " + x)
          }))
          (Keep.left).addAttributes(Attributes.inputBuffer(initial = 1, max = 1))

现在,一些来自上游的元素被丢弃,但有一个大小为 1 的最小输入缓冲区,我们得到以下输出:

Emitted: 1
Emitted: 1
Emitted: 1
Emitted: 1
Emitted: 1
Consumed: 1
Consumed: 1
Consumed: 1

我希望这个回答能为 stefano 的回答增加价值。

akka团队永远领先一步

In general, when time or rate driven processing stages exhibit strange behavior, one of the first solutions to try should be to decrease the input buffer of the affected elements to 1.

** 更新:**

Konrad Malawski 认为这是一个 活泼的解决方案 并建议我将此行为实现为 GraphStage。在这里。

class LastElement[A] extends GraphStage[FlowShape[A,A]] {
    private val in = Inlet[A]("last-in")
    private val out = Outlet[A]("last-out")

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
      var pushPending: Option[A] = None

      override def preStart(): Unit = pull(in)

      def pushIfAvailable() = if (isAvailable(out)) {
        pushPending.foreach(p => {
          push(out, p)
          pushPending = None
        })
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = pushIfAvailable
      })

      setHandler(in,new InHandler {
        override def onPush(): Unit = {
          pushPending = Some(grab(in))
          pushIfAvailable
          pull(in)
        }
      })

    }

    override def shape: FlowShape[A, A] = FlowShape(in,out)
  }