Resilience4j 速率限制器在项目反应器中无法正常工作?
Resilience4j rate limiter is not working properly in project reactor?
我目前正在研究 resilience4j 库,但出于某种原因,以下代码无法按预期工作:
@Test
public void testRateLimiterProjectReactor()
{
// The configuration below will allow 2 requests per second and a "timeout" of 2 seconds.
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(2)
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofSeconds(2))
.build();
// Step 2.
// Create a RateLimiter and use it.
RateLimiterRegistry registry = RateLimiterRegistry.of(config);
RateLimiter rateLimiter = registry.rateLimiter("myReactorServiceNameLimiter");
// Step 3.
Flux<Integer> flux = Flux.from(Flux.range(0, 10))
.transformDeferred(RateLimiterOperator.of(rateLimiter))
.log()
;
StepVerifier.create(flux)
.expectNextCount(10)
.expectComplete()
.verify()
;
}
根据官方示例here and here,这应该限制每秒 request()
至 2
个元素。但是,日志显示它正在立即获取所有元素:
15:08:24.587 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:08:24.619 [main] INFO reactor.Flux.Defer.1 - onSubscribe(RateLimiterSubscriber)
15:08:24.624 [main] INFO reactor.Flux.Defer.1 - request(unbounded)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(0)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(1)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(2)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(3)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(4)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(5)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(6)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(7)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(8)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(9)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onComplete()
我看不出有什么问题?
正如上面评论中已经回答的那样,RateLimiter 跟踪订阅的数量,而不是元素。要实现对元素的速率限制,您可以使用 limitRate(和 buffer + delayElements)。
例如,
Flux.range(1, 100)
.delayElements(Duration.ofMillis(100)) // to imitate a publisher that produces elements at a certain rate
.log()
.limitRate(10) // used to requests up to 10 elements from the publisher
.buffer(10) // groups integers by 10 elements
.delayElements(Duration.ofSeconds(2)) // emits a group of ints every 2 sec
.subscribe(System.out::println);
我目前正在研究 resilience4j 库,但出于某种原因,以下代码无法按预期工作:
@Test
public void testRateLimiterProjectReactor()
{
// The configuration below will allow 2 requests per second and a "timeout" of 2 seconds.
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(2)
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofSeconds(2))
.build();
// Step 2.
// Create a RateLimiter and use it.
RateLimiterRegistry registry = RateLimiterRegistry.of(config);
RateLimiter rateLimiter = registry.rateLimiter("myReactorServiceNameLimiter");
// Step 3.
Flux<Integer> flux = Flux.from(Flux.range(0, 10))
.transformDeferred(RateLimiterOperator.of(rateLimiter))
.log()
;
StepVerifier.create(flux)
.expectNextCount(10)
.expectComplete()
.verify()
;
}
根据官方示例here and here,这应该限制每秒 request()
至 2
个元素。但是,日志显示它正在立即获取所有元素:
15:08:24.587 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:08:24.619 [main] INFO reactor.Flux.Defer.1 - onSubscribe(RateLimiterSubscriber)
15:08:24.624 [main] INFO reactor.Flux.Defer.1 - request(unbounded)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(0)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(1)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(2)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(3)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(4)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(5)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(6)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(7)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(8)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(9)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onComplete()
我看不出有什么问题?
正如上面评论中已经回答的那样,RateLimiter 跟踪订阅的数量,而不是元素。要实现对元素的速率限制,您可以使用 limitRate(和 buffer + delayElements)。 例如,
Flux.range(1, 100)
.delayElements(Duration.ofMillis(100)) // to imitate a publisher that produces elements at a certain rate
.log()
.limitRate(10) // used to requests up to 10 elements from the publisher
.buffer(10) // groups integers by 10 elements
.delayElements(Duration.ofSeconds(2)) // emits a group of ints every 2 sec
.subscribe(System.out::println);