Reactor core, take 30 then wait before take another 30
Reactor core, take 30 then wait before taking another 30
使用 reactor.core.publisher.Flux
,如何从 Flux
中获取 n 个值,然后等待一段时间再获取下一批?
onboardService
.loadRepositories(user) // Flux of values
.take(30) // Take 30 from the flux
.delayElements(Duration.ofMinutes(1)) // Wait one minute
.doOnEach(...) // Process the value
.??? // How to repeat with the next 30?
在链的更深处,我将 Flux
中的每个值推送到速率限制为每分钟 30 个的服务。
onboardService
.loadRepositories(user)
.limitRate(10)
.delayElements(Duration.ofSeconds(10))
听起来像我想要的,但它的行为并不像我期望的那样。将它与这些参数一起使用,它在处理每个单独的通量之间等待 10 秒,而我希望它处理 10 个,然后再处理 10 个。
有没有更好的方法来避免终端服务过载?
您可以 window 通量并将每个 window 限制为一分钟:
onboardService.loadRepositories(user)
.transform(flux ->
flux
.window(30)
.zipWith(Flux.interval(Duration.ZERO, Duration.ofMinutes(1)))
.flatMap(Tuple2::getT1))
.doOnEach(...) // Process the value
使用 reactor.core.publisher.Flux
,如何从 Flux
中获取 n 个值,然后等待一段时间再获取下一批?
onboardService
.loadRepositories(user) // Flux of values
.take(30) // Take 30 from the flux
.delayElements(Duration.ofMinutes(1)) // Wait one minute
.doOnEach(...) // Process the value
.??? // How to repeat with the next 30?
在链的更深处,我将 Flux
中的每个值推送到速率限制为每分钟 30 个的服务。
onboardService
.loadRepositories(user)
.limitRate(10)
.delayElements(Duration.ofSeconds(10))
听起来像我想要的,但它的行为并不像我期望的那样。将它与这些参数一起使用,它在处理每个单独的通量之间等待 10 秒,而我希望它处理 10 个,然后再处理 10 个。
有没有更好的方法来避免终端服务过载?
您可以 window 通量并将每个 window 限制为一分钟:
onboardService.loadRepositories(user)
.transform(flux ->
flux
.window(30)
.zipWith(Flux.interval(Duration.ZERO, Duration.ofMinutes(1)))
.flatMap(Tuple2::getT1))
.doOnEach(...) // Process the value