如何延迟重复的 WebClient get 请求
How to delay repeated WebClient get request
我正在使用 Spring 5 WebClient 从 REST api.
中重复获取 运行 进程的某些状态
在 的帮助下,我现在得出了这个解决方案:
webClient.get().uri(...).retrieve.bodyToMono(State.class)
.repeat()
.skipUntil(state -> stateFinished())
.limitRequest(1)
.subscribe(state -> {...});
虽然这有效,但 get 请求的触发率非常高。将请求速率限制为每秒 1 个请求的正确方法是什么?
我尝试使用 delayElements(Duration.ofSeconds(1))
但这只会延迟结果,而不是请求本身。
在您的情况下还有另一个小解决方法,您使用用作限制器的 Flux 压缩每个调用。
.zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)))
虽然我认为 delayElements()
可以工作,但也许您没有将它放在 Webclient 堆栈的正确阶段。
您可以将 repeatWhen
运算符与您自定义的伴随 Publisher
运算符一起使用
Mono.just("test")
.repeatWhen(longFlux -> Flux.interval(Duration.ofSeconds(1)))
.take(5)
.log()
.blockLast();
或使用 reactor-addons
中的 Repeate
函数
Mono.just("test")
.repeatWhen(Repeat.times(Long.MAX_VALUE)
.fixedBackoff(Duration.ofSeconds(1)))
.take(5)
.log()
.blockLast();
您的问题的替代解决方案
Flux.interval(Duration.ZERO, Duration.ofSeconds(1))
.onBackpressureDrop()
.concatMap(i -> webClientCall(...), 1)
//or flatMap() if you want send request each second
.filter(state -> stateFinished(state))
.next()
.timeout(Duration.ofSeconds(...))
//
.subscribe(state -> {...});
但请记住,如果您自己订阅(而不是 Spring),则反应器订阅者上下文不会传播到您的请求(没有安全上下文、侦探等...)
你在使用 delayElements
告诉我你把它放在重复之后。您要延迟的是对 WebClient 的订阅。
webClient
.get()
.uri(...)
.retrieve
.bodyToMono(State.class)
.delaySubscription(Duration.ofSeconds(1)) //Just add this before the repeat
.repeat()
.skipUntil(state -> stateFinished())
.limitRequest(1)
.subscribe(state -> {...});
这样做可以确保在第n个请求的响应和第n+1个请求的触发之间有一个秒。如果您需要固定的调用频率而不考虑每个请求响应所花费的时间,请按照 Roman 的建议用 Flux.interval
包装您的代码。
我正在使用 Spring 5 WebClient 从 REST api.
中重复获取 运行 进程的某些状态在
webClient.get().uri(...).retrieve.bodyToMono(State.class)
.repeat()
.skipUntil(state -> stateFinished())
.limitRequest(1)
.subscribe(state -> {...});
虽然这有效,但 get 请求的触发率非常高。将请求速率限制为每秒 1 个请求的正确方法是什么?
我尝试使用 delayElements(Duration.ofSeconds(1))
但这只会延迟结果,而不是请求本身。
在您的情况下还有另一个小解决方法,您使用用作限制器的 Flux 压缩每个调用。
.zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)))
虽然我认为 delayElements()
可以工作,但也许您没有将它放在 Webclient 堆栈的正确阶段。
您可以将 repeatWhen
运算符与您自定义的伴随 Publisher
Mono.just("test")
.repeatWhen(longFlux -> Flux.interval(Duration.ofSeconds(1)))
.take(5)
.log()
.blockLast();
或使用 reactor-addons
中的Repeate
函数
Mono.just("test")
.repeatWhen(Repeat.times(Long.MAX_VALUE)
.fixedBackoff(Duration.ofSeconds(1)))
.take(5)
.log()
.blockLast();
您的问题的替代解决方案
Flux.interval(Duration.ZERO, Duration.ofSeconds(1))
.onBackpressureDrop()
.concatMap(i -> webClientCall(...), 1)
//or flatMap() if you want send request each second
.filter(state -> stateFinished(state))
.next()
.timeout(Duration.ofSeconds(...))
//
.subscribe(state -> {...});
但请记住,如果您自己订阅(而不是 Spring),则反应器订阅者上下文不会传播到您的请求(没有安全上下文、侦探等...)
你在使用 delayElements
告诉我你把它放在重复之后。您要延迟的是对 WebClient 的订阅。
webClient
.get()
.uri(...)
.retrieve
.bodyToMono(State.class)
.delaySubscription(Duration.ofSeconds(1)) //Just add this before the repeat
.repeat()
.skipUntil(state -> stateFinished())
.limitRequest(1)
.subscribe(state -> {...});
这样做可以确保在第n个请求的响应和第n+1个请求的触发之间有一个秒。如果您需要固定的调用频率而不考虑每个请求响应所花费的时间,请按照 Roman 的建议用 Flux.interval
包装您的代码。