使用流式 ServiceCall 的 Lagom 服务之间的背压不起作用

Backpressure between Lagom services using streaming ServiceCall not working

Lagom: 1.5.4

考虑进行 ServiceCall (example):

def stream: ServiceCall[NotUsed, Source[Int, NotUsed]] = ServiceCall { _ =>
  Future.successful(
    Source(1.to(1000)).wireTap(msg => log.info(s"sending $msg"))
  )
}

当另一个服务 (example) 通过以下方式使用此 ServiceCall 时:

val ticker = Source.tick(1.second, 100.millis, true)
helloWorldStreamService.stream.invoke().flatMap(_.zip(ticker).map {
  case (msg, _) =>
    log.info(s"received $msg")
    msg
}.runWith(Sink.seq))

你会期望人为地减慢消费者会减慢生产者。 查看日志,情况似乎并非如此:

sending 1
sending 2
sending 3
[...]
sending 1000

[1 second pause]

received 1
[100ms pause]
received 2
[100ms pause]
received 3
[...]

我是否遗漏了任何隐藏的缓冲区?

示例代码:
https://github.com/an-tex/lagom-backpressure

运行 sbt runAll

然后执行curl 127.0.0.1:[port of hello-world-stream-client service]/api/test

看效果

存在超出测试大小的系统缓冲区。在 Mac OS 上似乎有一个 128kb(512kb 突发)缓冲区。在缓冲区之外,背压就像一个魅力。我已经用更大的测试大小更新了 github repo,以防有人想玩。

归功于 TimMoore who answered this question on Lightbend Discuess