是否可以在 Mono/Flux 中创建动态 filter/predicate?

Is it possible to create a dynamic filter/predicate in Mono/Flux?

该应用程序是一个简单的处理 - 读取数据、过滤数据、处理和写入过滤后的数据。

这是一个运行没有任何问题的简单代码:

  void run() {
    Flux.interval(Duration.ofMillis(200))
        .filter(value -> getPredicate().test(value))
        .flatMap(this::writeData)
        .subscribe();
  }

  private Predicate<Long> getPredicate() {
    return value -> value % 2 == 0;
  }

是否可以使用定期请求从远程 Web 服务检索的动态谓词? 如果可能 - 如何在 .filter() 中使用 Mono

 并保持非阻塞

例如,将 getPredicate() 替换为以下内容:

  private Mono<Predicate<Long>> getPredicateFromRemoteServer() {
    return webClient.get()
        .uri("/filters/1")
        .retrieve()
        .bodyToMono(Filter.class)
        .map(this::mapToPredicate)
        .cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO);
  }

  private Predicate<Long> mapToPredicate(Filter filter) {
    // here will be converting filter object into predicate
    return value -> value > 5;
  }

理想情况下,我想避免使用缓存(Duration.ofMinutes(10)),因为过滤器可能每分钟或每天更新一次……一旦过滤器更新,我的服务就会收到通知,但我没有找不到在外部使缓存失效的方法,这就是为什么 Duration.ofMinutes(10) 用于某些近似失效的原因。

好吧,也许您可​​以稍微不同地编写管道。与其每次通过调用 getPredicateFromRemoteServer() 处理流中的项目时都希望 return 一个新的 Predicate,您可以使函数本身成为谓词。传递您正在从流中处理的值,并使它成为 return 一个带有答案的 Mono<Boolean>,并在您的管道中的 filterWhen 管道中使用它。

例如,有点像这样:

private Mono<Boolean> isWithinThreshold(int value) {
    return webClient.get()
        .uri("/filters/1")
        .retrieve()
        .bodyToMono(Filter.class)
        .map(filter -> filter.threshold <= value)
        .cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO);
  }

然后在你的主管道中你可以做:

Flux.interval(Duration.ofMillis(200))
        .filterWhen(value -> isWithinThreshold(value))
        .flatMap(this::writeData)
        .subscribe();
  }