为什么 Akka Streams TestSource 不接收每个拉取请求?

Why does the Akka Streams TestSource not receive every pull as a request?

我正在尝试追踪自定义流中的错误,其中有额外的请求向上游。我写了一个测试用例,考虑到我看到的行为和日志记录,它应该失败了,但它没有。

然后我做了一个带有明显额外拉动的简单流程,但测试仍然没有失败。

这是一个例子:

class EagerFlow extends GraphStage[FlowShape[String, String]] {

  val in: Inlet[String] = Inlet("EagerFlow.in")
  val out: Outlet[String] = Outlet("EagerFlow.out")

  override def shape: FlowShape[String, String] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = {
        println("EagerFlow.in.onPush")
        println("EagerFlow.out.push")
        push(out, grab(in))
        println("EagerFlow.in.pull")
        pull(in)
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = {
        println("EagerFlow.out.onPull")
        println("EagerFlow.in.pull")
        pull(in)
      }
    })
  }
}

这是一个应该通过的测试,即确认有一个额外的请求。

"Eager Flow" should "pull on every push" in {
  val sourceProbe = TestSource.probe[String]
  val sinkProbe = TestSink.probe[String]

  val eagerFlow = Flow.fromGraph(new EagerFlow)

  val (source, sink) = sourceProbe.
    via(eagerFlow).
    toMat(sinkProbe)(Keep.both).
    run

  sink.request(1)
  source.expectRequest()
  source.expectNoMsg()

  source.sendNext("hello")
  sink.expectNext()
  source.expectRequest()
}

在标准输出中你可以看到:

0C3E08A8 PULL DownstreamBoundary -> TestSpec$EagerFlow@1d154180 (TestSpec$EagerFlow$$anon$$anon@7e62ecbd) [TestSpec$EagerFlow$$anon@36904dd4]
EagerFlow.out.onPull
EagerFlow.in.pull
0C3E08A8 PULL TestSpec$EagerFlow@1d154180 -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary]
EagerFlow.in.onPush
EagerFlow.out.push
EagerFlow.in.pull
0C3E08A8 PULL TestSpec$EagerFlow@1d154180 -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary]

assertion failed: timeout (3 seconds) during expectMsg:
  java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg:
  at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:368)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:737)
at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:665)
at akka.stream.testkit.TestPublisher$Probe.expectRequest(StreamTestKit.scala:172)

(这是在 akka.stream.impl.fusing.GraphInterpreter#processEvent 中的调试日志记录,实际上使用的是打印点,但实际上是一样的)

为什么这个拉力在 UpstreamBoundary 停止并且再也没有回到 TestSource.probe

我有很多测试使用 expectNoMsg() 来确保正确的背压,但似乎这些只会给出误报。如果这不起作用,我应该如何测试它?

为了使您的测试适用,您需要将 Flow 缓冲区的大小设置为 1。

val settings = ActorMaterializerSettings(system)
  .withInputBuffer(initialSize = 1, maxSize = 1)
implicit val mat = ActorMaterializer(settings)

默认尺寸为:

  # Initial size of buffers used in stream elements
  initial-input-buffer-size = 4
  # Maximum size of buffers used in stream elements
  max-input-buffer-size = 16

这意味着 Akka 会在您第一次请求时急切地请求并摄取最多 16 个元素anyway。有关内部缓冲区的更多信息,请参见 here.