无法通过 spring 网络反应表现出背压
Not able to exhibit backpressure with spring web reactive
我正在尝试使用 spring-web-reactive 来展示背压,就像这里用 akka 显示的那样 - https://www.youtube.com/watch?v=oS9w3VenDW0
(在 28:20 和 29:20 之间观看)。
为了尝试,我使用了下面来自 github https://github.com/bclozel/spring-boot-web-reactive
的示例项目
设置项目后,我在 HomeController.java 中添加了一个新端点,如下所示:
@RequestMapping(value = "/longflux",produces = "application/stream+json")
public Flux<Long> longFlux(){
return Flux.interval(Duration.ofMillis(10)).log();
}
现在,如果我尝试卷曲此端点,然后使用 (CTRL+z) 将其挂起,一旦 tcp 缓冲区已满且服务器应停止发出事件,背压就会启动。
但是,在一段时间后暂停 curl 命令会抛出以下异常:
2017-02-16 08:49:48.480 ERROR 3500 --- [ timer-1] reactor.Flux.Interval.4 : onError(reactor.core.Exceptions$OverflowException: Could not emit value 2578 due to lack of requests)
2017-02-16 08:49:48.481 ERROR 3500 --- [ timer-1] reactor.Flux.Interval.4 :
reactor.core.Exceptions$OverflowException: Could not emit value 2578 due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:151) ~[reactor-core-3.0.4.RELEASE.jar:3.0.4.RELEASE]
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:98) ~[reactor-core-3.0.4.RELEASE.jar:3.0.4.RELEASE]
at reactor.core.scheduler.SingleTimedScheduler$TimedPeriodicScheduledRunnable.run(SingleTimedScheduler.java:394) ~[reactor-core-3.0.4.RELEASE.jar:3.0.4.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_121]
我无法理解为什么请求在 curl 命令暂停后的某个时间异常终止(在 spring-web-reactive 实现中),而在 akka 示例中(如 youtube 中所示) link) 一旦 tcp 缓冲区已满,服务器就停止发布事件。
Flux.interval
是一个特例,因为它是一个热源并且时间没有被 Reactor 缓冲;这意味着如果您的请求周期由于背压而变慢并且您的间隔源生成速度更快,Reactor 将发出一个错误信号。
您可以使用 .onBackpressureDrop()
运算符更新此示例,以在出现背压时降低间隔。这应该按预期运行。
有很多方法可以说明背压,包括:
- 使用
delay
运算符延迟订阅
- 模拟多个慢速客户端(带宽和延迟)
我正在尝试使用 spring-web-reactive 来展示背压,就像这里用 akka 显示的那样 - https://www.youtube.com/watch?v=oS9w3VenDW0 (在 28:20 和 29:20 之间观看)。
为了尝试,我使用了下面来自 github https://github.com/bclozel/spring-boot-web-reactive
的示例项目设置项目后,我在 HomeController.java 中添加了一个新端点,如下所示:
@RequestMapping(value = "/longflux",produces = "application/stream+json")
public Flux<Long> longFlux(){
return Flux.interval(Duration.ofMillis(10)).log();
}
现在,如果我尝试卷曲此端点,然后使用 (CTRL+z) 将其挂起,一旦 tcp 缓冲区已满且服务器应停止发出事件,背压就会启动。
但是,在一段时间后暂停 curl 命令会抛出以下异常:
2017-02-16 08:49:48.480 ERROR 3500 --- [ timer-1] reactor.Flux.Interval.4 : onError(reactor.core.Exceptions$OverflowException: Could not emit value 2578 due to lack of requests)
2017-02-16 08:49:48.481 ERROR 3500 --- [ timer-1] reactor.Flux.Interval.4 :
reactor.core.Exceptions$OverflowException: Could not emit value 2578 due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:151) ~[reactor-core-3.0.4.RELEASE.jar:3.0.4.RELEASE]
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:98) ~[reactor-core-3.0.4.RELEASE.jar:3.0.4.RELEASE]
at reactor.core.scheduler.SingleTimedScheduler$TimedPeriodicScheduledRunnable.run(SingleTimedScheduler.java:394) ~[reactor-core-3.0.4.RELEASE.jar:3.0.4.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_121]
我无法理解为什么请求在 curl 命令暂停后的某个时间异常终止(在 spring-web-reactive 实现中),而在 akka 示例中(如 youtube 中所示) link) 一旦 tcp 缓冲区已满,服务器就停止发布事件。
Flux.interval
是一个特例,因为它是一个热源并且时间没有被 Reactor 缓冲;这意味着如果您的请求周期由于背压而变慢并且您的间隔源生成速度更快,Reactor 将发出一个错误信号。
您可以使用 .onBackpressureDrop()
运算符更新此示例,以在出现背压时降低间隔。这应该按预期运行。
有很多方法可以说明背压,包括:
- 使用
delay
运算符延迟订阅 - 模拟多个慢速客户端(带宽和延迟)