Project Reactor 中的背压是如何工作的?
How does backpressure work in Project Reactor?
我一直在 Spring Reactor 工作,之前的一些测试让我想知道 Fluxes 默认情况下如何处理背压。我知道 onBackpressureBuffer 等存在,我也读过 RxJava defaults to unbounded until you define whether to buffer, drop, etc.
那么,谁能为我澄清一下:Reactor 3 中 Flux 的默认背压行为是什么?
我尝试搜索答案,但没有找到任何明确的答案,只有 Backpressure 的定义或上面为 RxJava 链接的答案
什么是背压?
Backpressure or the ability for the consumer to signal the producer
that the rate of emission is too high - Reactor Reference
当我们谈论背压时,我们必须将 sources/publishers 分为两组:尊重订阅者需求的一组,以及忽略它的一组。
通常热点不尊重订阅者的需求,因为它们经常产生实时数据,比如收听 Twitter 提要。在此示例中,订阅者无法控制推文的创建速度,因此很容易被淹没。
另一方面,冷源通常在订阅发生时按需生成数据,例如发出 HTTP 请求然后处理响应。在这种情况下,您调用的 HTTP 服务器只会在您发送请求后发送响应。
重要的是要注意这不是规则:并非每个热源都忽略需求,也不是每个冷源都尊重需求。您可以阅读更多关于热源和冷源的信息 here.
让我们看一些可能有助于理解的示例。
尊重需求的发布者
给定一个生成从 1 到 Integer.MAX_VALUE
的数字的 Flux,并给定一个处理步骤需要 100 毫秒来处理单个元素:
Flux.range(1, Integer.MAX_VALUE)
.log()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time
.blockLast();
让我们看看日志:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (parallel-1) | request(1)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-2) | request(1)
[ INFO] (parallel-2) | onNext(4)
[ INFO] (parallel-3) | request(1)
[ INFO] (parallel-3) | onNext(5)
我们可以看到在每个onNext之前都有一个请求。请求信号由 concatMap
运营商发送。当 concatMap
完成当前元素并准备接受下一个元素时发出信号。源仅在收到来自下游的请求时才发送下一个项目。
在此示例中,背压是自动的,我们不需要定义任何策略,因为操作员知道它可以处理什么并且源尊重它。
忽略需求且未定义背压策略的发布者
为了简单起见,我为这个例子选择了一个易于理解的冷发布者。 Flux.interval 每指定的时间间隔发出一个项目。这个冷漠的发布者不尊重需求是有道理的,因为看到项目以不同的、比最初指定的间隔更长的间隔发出是很奇怪的。
让我们看看代码:
Flux.interval(Duration.ofMillis(1))
.log()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)))
.blockLast();
Source 每毫秒发出一个项目。订阅者每 100 毫秒能够处理一个项目。很明显,订阅者无法跟上生产者的步伐,我们很快就会遇到这样的异常:
reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
...
我们可以做些什么来避免这个异常?
定义了忽略需求和背压策略的发布者
默认的背压策略就是我们在上面看到的那个:错误终止。 Reactor 不会对我们强制执行任何错误处理策略。当我们看到这种错误时,我们可以决定哪种错误最适合我们的用例。
您可以在 Reactor reference 中找到其中的几个。
对于这个例子,我们将使用最简单的一个:onBackpressureDrop
。
Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop()
.concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a))
.doOnNext(a -> System.out.println("Element kept by consumer: " + a))
.blockLast();
输出:
Element kept by consumer: 0
Element kept by consumer: 1
Element kept by consumer: 2
Element kept by consumer: 3
Element kept by consumer: 4
Element kept by consumer: 5
Element kept by consumer: 6
Element kept by consumer: 7
Element kept by consumer: 8
Element kept by consumer: 9
Element kept by consumer: 10
Element kept by consumer: 11
Element kept by consumer: 12
Element kept by consumer: 13
Element kept by consumer: 14
Element kept by consumer: 15
Element kept by consumer: 16
Element kept by consumer: 17
Element kept by consumer: 18
Element kept by consumer: 19
Element kept by consumer: 20
Element kept by consumer: 21
Element kept by consumer: 22
Element kept by consumer: 23
Element kept by consumer: 24
Element kept by consumer: 25
Element kept by consumer: 26
Element kept by consumer: 27
Element kept by consumer: 28
Element kept by consumer: 29
Element kept by consumer: 30
Element kept by consumer: 31
Element kept by consumer: 2399
Element kept by consumer: 2400
Element kept by consumer: 2401
Element kept by consumer: 2402
Element kept by consumer: 2403
Element kept by consumer: 2404
Element kept by consumer: 2405
Element kept by consumer: 2406
Element kept by consumer: 2407
我们可以看到,在前 32 个项目之后,有一个很大的跳跃到 2400。由于定义的策略,中间的元素被丢弃了。
要点
- 背压通常是自动的,我们不需要做任何事情,因为我们是按需获取数据。
- 如果来源不尊重订阅者的需求,我们需要定义一个策略来避免终止错误。
更新:
有用的阅读:How to control request rate
我一直在 Spring Reactor 工作,之前的一些测试让我想知道 Fluxes 默认情况下如何处理背压。我知道 onBackpressureBuffer 等存在,我也读过 RxJava defaults to unbounded until you define whether to buffer, drop, etc.
那么,谁能为我澄清一下:Reactor 3 中 Flux 的默认背压行为是什么?
我尝试搜索答案,但没有找到任何明确的答案,只有 Backpressure 的定义或上面为 RxJava 链接的答案
什么是背压?
Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high - Reactor Reference
当我们谈论背压时,我们必须将 sources/publishers 分为两组:尊重订阅者需求的一组,以及忽略它的一组。
通常热点不尊重订阅者的需求,因为它们经常产生实时数据,比如收听 Twitter 提要。在此示例中,订阅者无法控制推文的创建速度,因此很容易被淹没。
另一方面,冷源通常在订阅发生时按需生成数据,例如发出 HTTP 请求然后处理响应。在这种情况下,您调用的 HTTP 服务器只会在您发送请求后发送响应。
重要的是要注意这不是规则:并非每个热源都忽略需求,也不是每个冷源都尊重需求。您可以阅读更多关于热源和冷源的信息 here.
让我们看一些可能有助于理解的示例。
尊重需求的发布者
给定一个生成从 1 到 Integer.MAX_VALUE
的数字的 Flux,并给定一个处理步骤需要 100 毫秒来处理单个元素:
Flux.range(1, Integer.MAX_VALUE)
.log()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time
.blockLast();
让我们看看日志:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (parallel-1) | request(1)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-2) | request(1)
[ INFO] (parallel-2) | onNext(4)
[ INFO] (parallel-3) | request(1)
[ INFO] (parallel-3) | onNext(5)
我们可以看到在每个onNext之前都有一个请求。请求信号由 concatMap
运营商发送。当 concatMap
完成当前元素并准备接受下一个元素时发出信号。源仅在收到来自下游的请求时才发送下一个项目。
在此示例中,背压是自动的,我们不需要定义任何策略,因为操作员知道它可以处理什么并且源尊重它。
忽略需求且未定义背压策略的发布者
为了简单起见,我为这个例子选择了一个易于理解的冷发布者。 Flux.interval 每指定的时间间隔发出一个项目。这个冷漠的发布者不尊重需求是有道理的,因为看到项目以不同的、比最初指定的间隔更长的间隔发出是很奇怪的。
让我们看看代码:
Flux.interval(Duration.ofMillis(1))
.log()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)))
.blockLast();
Source 每毫秒发出一个项目。订阅者每 100 毫秒能够处理一个项目。很明显,订阅者无法跟上生产者的步伐,我们很快就会遇到这样的异常:
reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
...
我们可以做些什么来避免这个异常?
定义了忽略需求和背压策略的发布者
默认的背压策略就是我们在上面看到的那个:错误终止。 Reactor 不会对我们强制执行任何错误处理策略。当我们看到这种错误时,我们可以决定哪种错误最适合我们的用例。
您可以在 Reactor reference 中找到其中的几个。
对于这个例子,我们将使用最简单的一个:onBackpressureDrop
。
Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop()
.concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a))
.doOnNext(a -> System.out.println("Element kept by consumer: " + a))
.blockLast();
输出:
Element kept by consumer: 0
Element kept by consumer: 1
Element kept by consumer: 2
Element kept by consumer: 3
Element kept by consumer: 4
Element kept by consumer: 5
Element kept by consumer: 6
Element kept by consumer: 7
Element kept by consumer: 8
Element kept by consumer: 9
Element kept by consumer: 10
Element kept by consumer: 11
Element kept by consumer: 12
Element kept by consumer: 13
Element kept by consumer: 14
Element kept by consumer: 15
Element kept by consumer: 16
Element kept by consumer: 17
Element kept by consumer: 18
Element kept by consumer: 19
Element kept by consumer: 20
Element kept by consumer: 21
Element kept by consumer: 22
Element kept by consumer: 23
Element kept by consumer: 24
Element kept by consumer: 25
Element kept by consumer: 26
Element kept by consumer: 27
Element kept by consumer: 28
Element kept by consumer: 29
Element kept by consumer: 30
Element kept by consumer: 31
Element kept by consumer: 2399
Element kept by consumer: 2400
Element kept by consumer: 2401
Element kept by consumer: 2402
Element kept by consumer: 2403
Element kept by consumer: 2404
Element kept by consumer: 2405
Element kept by consumer: 2406
Element kept by consumer: 2407
我们可以看到,在前 32 个项目之后,有一个很大的跳跃到 2400。由于定义的策略,中间的元素被丢弃了。
要点
- 背压通常是自动的,我们不需要做任何事情,因为我们是按需获取数据。
- 如果来源不尊重订阅者的需求,我们需要定义一个策略来避免终止错误。
更新: 有用的阅读:How to control request rate