使用 Spring Project Reactor 延迟背压后重试?

Retry after delay on back pressure with Spring Project Reactor?

背景

我正在尝试使用 Spring Project Reactor 版本 3.3.0 实现类似于简单非阻塞速率限制器的功能。例如,要将数量限制为每秒 100 个请求,我使用此实现:

myFlux
      .bufferTimeout(100, Duration.ofSeconds(1))
      .delayElements(Duration.ofSeconds(1))
      ..

这适用于我的用例,但如果订阅者跟不上 myFlux 发布者的速度,它将(正确地)抛出一个 OverflowException:

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxLift] :
    reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)

在我的例子中,订阅者使用所有元素很重要,例如降低背压 (onBackpressureDrop()) 是不可接受的。

问题

有没有一种方法,而不是在背压下丢弃元素,而是暂停消息的发布,直到订阅者跟上进度?在我的例子中,myFlux 正在发布一个有限但大量的元素,它们保存在一个持久的数据库中,因此恕我直言,不需要删除元素。

bufferTimeout(int maxSize, Duration maxTime) 请求无限数量的消息,因此对背压不敏感。这使得它不适合你的情况。

在概念层面上,bufferTimeout 不能对背压敏感,因为您明确指示发布者在每个经过的持续时间内发出一批(即使它是空的)。如果订阅者太慢,这将 - 理所当然地 - 导致溢出。

相反,尝试:

myFlux
   .delayElements(Duration.ofMillis(10))
   .buffer(100)

myFlux
   .buffer(100)
   .delayElements(Duration.ofSeconds(1))

buffer(int maxSize) 请求正确的上游数量 (request * maxSize),因此对订阅者的背压很敏感。