使 filterWhen() 与 Mono 谓词并行

making filterWhen() with Mono predicate parallel

使用 Reactor Flux 过滤器时,我看到了一些我需要克服的行为。

给定以下代码:

Flux.fromIterable(List.of(1,2))
  .filterWhen(it -> predicateMono(it))

其中:

Mono<boolean> predicateMono(int value) { ... } 

我注意到 predicateMono() 是按顺序执行的,这意味着对于值 2,在第一个完成之前不会调用该操作。

当我的代码中的 predicateMono()s 是对我想并行执行的后端系统的 http 调用时,这就成了一个问题。我该如何编写才能以并行方式过滤通量值?

predicateMono() 是非阻塞的 http 调用,与反应式方法兼容。

解决方案是使用 flatmap 而不是 filterwhen。如果您想过滤掉,只需映射到空。即将进行的操作将忽略 Flux 上的空值。

Flux.fromIterable(List.of(1, 2))
        .flatMap(it ->
            predicateMono(it).flatMap(result ->
                result ? Mono.just(it) : Mono.empty()));