使用流式 ServiceCall 的 Lagom 服务之间的背压不起作用
Backpressure between Lagom services using streaming ServiceCall not working
Lagom: 1.5.4
考虑进行 ServiceCall (example):
def stream: ServiceCall[NotUsed, Source[Int, NotUsed]] = ServiceCall { _ =>
Future.successful(
Source(1.to(1000)).wireTap(msg => log.info(s"sending $msg"))
)
}
当另一个服务 (example) 通过以下方式使用此 ServiceCall 时:
val ticker = Source.tick(1.second, 100.millis, true)
helloWorldStreamService.stream.invoke().flatMap(_.zip(ticker).map {
case (msg, _) =>
log.info(s"received $msg")
msg
}.runWith(Sink.seq))
你会期望人为地减慢消费者会减慢生产者。
查看日志,情况似乎并非如此:
sending 1
sending 2
sending 3
[...]
sending 1000
[1 second pause]
received 1
[100ms pause]
received 2
[100ms pause]
received 3
[...]
我是否遗漏了任何隐藏的缓冲区?
示例代码:
https://github.com/an-tex/lagom-backpressure
运行 sbt runAll
然后执行curl 127.0.0.1:[port of hello-world-stream-client service]/api/test
看效果
存在超出测试大小的系统缓冲区。在 Mac OS 上似乎有一个 128kb(512kb 突发)缓冲区。在缓冲区之外,背压就像一个魅力。我已经用更大的测试大小更新了 github repo,以防有人想玩。
归功于 TimMoore who answered this question on Lightbend Discuess
Lagom: 1.5.4
考虑进行 ServiceCall (example):
def stream: ServiceCall[NotUsed, Source[Int, NotUsed]] = ServiceCall { _ =>
Future.successful(
Source(1.to(1000)).wireTap(msg => log.info(s"sending $msg"))
)
}
当另一个服务 (example) 通过以下方式使用此 ServiceCall 时:
val ticker = Source.tick(1.second, 100.millis, true)
helloWorldStreamService.stream.invoke().flatMap(_.zip(ticker).map {
case (msg, _) =>
log.info(s"received $msg")
msg
}.runWith(Sink.seq))
你会期望人为地减慢消费者会减慢生产者。 查看日志,情况似乎并非如此:
sending 1
sending 2
sending 3
[...]
sending 1000
[1 second pause]
received 1
[100ms pause]
received 2
[100ms pause]
received 3
[...]
我是否遗漏了任何隐藏的缓冲区?
示例代码:
https://github.com/an-tex/lagom-backpressure
运行 sbt runAll
然后执行curl 127.0.0.1:[port of hello-world-stream-client service]/api/test
看效果
存在超出测试大小的系统缓冲区。在 Mac OS 上似乎有一个 128kb(512kb 突发)缓冲区。在缓冲区之外,背压就像一个魅力。我已经用更大的测试大小更新了 github repo,以防有人想玩。
归功于 TimMoore who answered this question on Lightbend Discuess