使 filterWhen() 与 Mono 谓词并行
making filterWhen() with Mono predicate parallel
使用 Reactor Flux 过滤器时,我看到了一些我需要克服的行为。
给定以下代码:
Flux.fromIterable(List.of(1,2))
.filterWhen(it -> predicateMono(it))
其中:
Mono<boolean> predicateMono(int value) { ... }
我注意到 predicateMono()
是按顺序执行的,这意味着对于值 2,在第一个完成之前不会调用该操作。
当我的代码中的 predicateMono()
s 是对我想并行执行的后端系统的 http 调用时,这就成了一个问题。我该如何编写才能以并行方式过滤通量值?
predicateMono()
是非阻塞的 http 调用,与反应式方法兼容。
解决方案是使用 flatmap
而不是 filterwhen
。如果您想过滤掉,只需映射到空。即将进行的操作将忽略 Flux 上的空值。
Flux.fromIterable(List.of(1, 2))
.flatMap(it ->
predicateMono(it).flatMap(result ->
result ? Mono.just(it) : Mono.empty()));
使用 Reactor Flux 过滤器时,我看到了一些我需要克服的行为。
给定以下代码:
Flux.fromIterable(List.of(1,2))
.filterWhen(it -> predicateMono(it))
其中:
Mono<boolean> predicateMono(int value) { ... }
我注意到 predicateMono()
是按顺序执行的,这意味着对于值 2,在第一个完成之前不会调用该操作。
当我的代码中的 predicateMono()
s 是对我想并行执行的后端系统的 http 调用时,这就成了一个问题。我该如何编写才能以并行方式过滤通量值?
predicateMono()
是非阻塞的 http 调用,与反应式方法兼容。
解决方案是使用 flatmap
而不是 filterwhen
。如果您想过滤掉,只需映射到空。即将进行的操作将忽略 Flux 上的空值。
Flux.fromIterable(List.of(1, 2))
.flatMap(it ->
predicateMono(it).flatMap(result ->
result ? Mono.just(it) : Mono.empty()));