Reactor core, take 30 then wait before take another 30

Reactor core, take 30 then wait before taking another 30

使用 reactor.core.publisher.Flux,如何从 Flux 中获取 n 个值,然后等待一段时间再获取下一批?

onboardService
  .loadRepositories(user)               // Flux of values
  .take(30)                             // Take 30 from the flux
  .delayElements(Duration.ofMinutes(1)) // Wait one minute
  .doOnEach(...)                        // Process the value
  .???                                  // How to repeat with the next 30?

在链的更深处,我将 Flux 中的每个值推送到速率限制为每分钟 30 个的服务。

onboardService
  .loadRepositories(user)   
  .limitRate(10)
  .delayElements(Duration.ofSeconds(10))

听起来像我想要的,但它的行为并不像我期望的那样。将它与这些参数一起使用,它在处理每个单独的通量之间等待 10 秒,而我希望它处理 10 个,然后再处理 10 个。

有没有更好的方法来避免终端服务过载?

您可以 window 通量并将每个 window 限制为一分钟:

onboardService.loadRepositories(user)
  .transform(flux ->
               flux
                 .window(30)
                 .zipWith(Flux.interval(Duration.ZERO, Duration.ofMinutes(1)))
                 .flatMap(Tuple2::getT1))
  .doOnEach(...) // Process the value