使用 Spring Project Reactor 延迟背压后重试?
Retry after delay on back pressure with Spring Project Reactor?
背景
我正在尝试使用 Spring Project Reactor 版本 3.3.0 实现类似于简单非阻塞速率限制器的功能。例如,要将数量限制为每秒 100 个请求,我使用此实现:
myFlux
.bufferTimeout(100, Duration.ofSeconds(1))
.delayElements(Duration.ofSeconds(1))
..
这适用于我的用例,但如果订阅者跟不上 myFlux
发布者的速度,它将(正确地)抛出一个 OverflowException
:
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxLift] :
reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)
在我的例子中,订阅者使用所有元素很重要,例如降低背压 (onBackpressureDrop()
) 是不可接受的。
问题
有没有一种方法,而不是在背压下丢弃元素,而是暂停消息的发布,直到订阅者跟上进度?在我的例子中,myFlux
正在发布一个有限但大量的元素,它们保存在一个持久的数据库中,因此恕我直言,不需要删除元素。
bufferTimeout(int maxSize, Duration maxTime)
请求无限数量的消息,因此对背压不敏感。这使得它不适合你的情况。
在概念层面上,bufferTimeout
不能对背压敏感,因为您明确指示发布者在每个经过的持续时间内发出一批(即使它是空的)。如果订阅者太慢,这将 - 理所当然地 - 导致溢出。
相反,尝试:
myFlux
.delayElements(Duration.ofMillis(10))
.buffer(100)
或
myFlux
.buffer(100)
.delayElements(Duration.ofSeconds(1))
buffer(int maxSize)
请求正确的上游数量 (request * maxSize
),因此对订阅者的背压很敏感。
背景
我正在尝试使用 Spring Project Reactor 版本 3.3.0 实现类似于简单非阻塞速率限制器的功能。例如,要将数量限制为每秒 100 个请求,我使用此实现:
myFlux
.bufferTimeout(100, Duration.ofSeconds(1))
.delayElements(Duration.ofSeconds(1))
..
这适用于我的用例,但如果订阅者跟不上 myFlux
发布者的速度,它将(正确地)抛出一个 OverflowException
:
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxLift] :
reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)
在我的例子中,订阅者使用所有元素很重要,例如降低背压 (onBackpressureDrop()
) 是不可接受的。
问题
有没有一种方法,而不是在背压下丢弃元素,而是暂停消息的发布,直到订阅者跟上进度?在我的例子中,myFlux
正在发布一个有限但大量的元素,它们保存在一个持久的数据库中,因此恕我直言,不需要删除元素。
bufferTimeout(int maxSize, Duration maxTime)
请求无限数量的消息,因此对背压不敏感。这使得它不适合你的情况。
在概念层面上,bufferTimeout
不能对背压敏感,因为您明确指示发布者在每个经过的持续时间内发出一批(即使它是空的)。如果订阅者太慢,这将 - 理所当然地 - 导致溢出。
相反,尝试:
myFlux
.delayElements(Duration.ofMillis(10))
.buffer(100)
或
myFlux
.buffer(100)
.delayElements(Duration.ofSeconds(1))
buffer(int maxSize)
请求正确的上游数量 (request * maxSize
),因此对订阅者的背压很敏感。