如何处理 Flux filterWhen() 的负面情况?

How to handle negative case of Flux filterWhen()?

如果 filterWhen() 为 false,如何进行异步非阻塞调用?

我有大量来自 SQS 的项目。在某些情况下,我需要将它们过滤掉,但过滤后我仍然需要向第 3 方 api 发送一个包含项目详细信息的事件。

代码如下所示:

getItems()
  .map(this::doSomething)
  .flatMap(item -> callService1(item).thenReturn(item))
  .filterWhen(item -> check(item))
  .flatMap(item -> callService2(item))

我要实现的是:

getItems()
  .map(this::doSomething)
  .flatMap(item -> callService1(item).thenReturn(item))
  .filterWhen(item -> check(item))
  .ifFalse(item -> 3rdPartyApi(item))
  .flatMap(item -> callService2(item))

是否有任何运算符可用于将行替换为 ifFalse() 或使用其他方法?

如果它很简单,我们或许可以使用 flatMap 并在其中进行检查。但是它的 filterWhen() 并不能改变。

check(...) 发出的结果时,您必须将 API 调用附加到用作 filterWhen “条件”的发布者(建立在 check(...) 调用之上) =14=] 发布者是 false:

Mono<Boolean> check(String s) {
    return Mono.fromCallable(() -> s.contains("example"));
}

Mono<Long> sideEffectOnFiltered(String s) {
    return Mono.delay(Duration.ofSeconds(2))
        .doOnNext(b -> System.out.println("did side effect on " + s));
}

@Test
void demonstrateIfFalse() {
    Flux.just("example", "value", "example2")
        .filterWhen(v -> check(v)
            .flatMap(isExample -> isExample ? Mono.just(true) :
                sideEffectOnFiltered(v).thenReturn(false)
            )
    )
        .log()
        .blockLast();
}

这输出:

11:03:45.640 [main] INFO  reactor.Flux.FilterWhen.1 - onSubscribe(FluxFilterWhen.FluxFilterWhenSubscriber)
11:03:45.646 [main] INFO  reactor.Flux.FilterWhen.1 - request(unbounded)
11:03:45.704 [main] INFO  reactor.Flux.FilterWhen.1 - onNext(example)
did side effect on value
11:03:47.726 [parallel-1] INFO  reactor.Flux.FilterWhen.1 - onNext(example2)
11:03:47.730 [parallel-1] INFO  reactor.Flux.FilterWhen.1 - onComplete()