是否可以在 Mono/Flux 中创建动态 filter/predicate?
Is it possible to create a dynamic filter/predicate in Mono/Flux?
该应用程序是一个简单的处理 - 读取数据、过滤数据、处理和写入过滤后的数据。
这是一个运行没有任何问题的简单代码:
void run() {
Flux.interval(Duration.ofMillis(200))
.filter(value -> getPredicate().test(value))
.flatMap(this::writeData)
.subscribe();
}
private Predicate<Long> getPredicate() {
return value -> value % 2 == 0;
}
是否可以使用定期请求从远程 Web 服务检索的动态谓词?
如果可能 - 如何在 .filter() 中使用 Mono
并保持非阻塞
例如,将 getPredicate() 替换为以下内容:
private Mono<Predicate<Long>> getPredicateFromRemoteServer() {
return webClient.get()
.uri("/filters/1")
.retrieve()
.bodyToMono(Filter.class)
.map(this::mapToPredicate)
.cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO);
}
private Predicate<Long> mapToPredicate(Filter filter) {
// here will be converting filter object into predicate
return value -> value > 5;
}
理想情况下,我想避免使用缓存(Duration.ofMinutes(10)),因为过滤器可能每分钟或每天更新一次……一旦过滤器更新,我的服务就会收到通知,但我没有找不到在外部使缓存失效的方法,这就是为什么 Duration.ofMinutes(10) 用于某些近似失效的原因。
好吧,也许您可以稍微不同地编写管道。与其每次通过调用 getPredicateFromRemoteServer()
处理流中的项目时都希望 return 一个新的 Predicate
,您可以使函数本身成为谓词。传递您正在从流中处理的值,并使它成为 return 一个带有答案的 Mono<Boolean>
,并在您的管道中的 filterWhen
管道中使用它。
例如,有点像这样:
private Mono<Boolean> isWithinThreshold(int value) {
return webClient.get()
.uri("/filters/1")
.retrieve()
.bodyToMono(Filter.class)
.map(filter -> filter.threshold <= value)
.cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO);
}
然后在你的主管道中你可以做:
Flux.interval(Duration.ofMillis(200))
.filterWhen(value -> isWithinThreshold(value))
.flatMap(this::writeData)
.subscribe();
}
该应用程序是一个简单的处理 - 读取数据、过滤数据、处理和写入过滤后的数据。
这是一个运行没有任何问题的简单代码:
void run() {
Flux.interval(Duration.ofMillis(200))
.filter(value -> getPredicate().test(value))
.flatMap(this::writeData)
.subscribe();
}
private Predicate<Long> getPredicate() {
return value -> value % 2 == 0;
}
是否可以使用定期请求从远程 Web 服务检索的动态谓词? 如果可能 - 如何在 .filter() 中使用 Mono
并保持非阻塞例如,将 getPredicate() 替换为以下内容:
private Mono<Predicate<Long>> getPredicateFromRemoteServer() { return webClient.get() .uri("/filters/1") .retrieve() .bodyToMono(Filter.class) .map(this::mapToPredicate) .cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO); } private Predicate<Long> mapToPredicate(Filter filter) { // here will be converting filter object into predicate return value -> value > 5; }
理想情况下,我想避免使用缓存(Duration.ofMinutes(10)),因为过滤器可能每分钟或每天更新一次……一旦过滤器更新,我的服务就会收到通知,但我没有找不到在外部使缓存失效的方法,这就是为什么 Duration.ofMinutes(10) 用于某些近似失效的原因。
好吧,也许您可以稍微不同地编写管道。与其每次通过调用 getPredicateFromRemoteServer()
处理流中的项目时都希望 return 一个新的 Predicate
,您可以使函数本身成为谓词。传递您正在从流中处理的值,并使它成为 return 一个带有答案的 Mono<Boolean>
,并在您的管道中的 filterWhen
管道中使用它。
例如,有点像这样:
private Mono<Boolean> isWithinThreshold(int value) {
return webClient.get()
.uri("/filters/1")
.retrieve()
.bodyToMono(Filter.class)
.map(filter -> filter.threshold <= value)
.cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO);
}
然后在你的主管道中你可以做:
Flux.interval(Duration.ofMillis(200))
.filterWhen(value -> isWithinThreshold(value))
.flatMap(this::writeData)
.subscribe();
}