Java Reactor:有没有一种方法可以将 Flux<Mono<T>> 转换为 Flux<T> 而无需急切获取?

Java Reactor: Is there a way to transform Flux<Mono<T>> into Flux<T> without eager fetching?

我有一个快速但昂贵的生产者(Spring WebClient)和一个非常慢的订阅者。我需要一种方法来处理整个链中的背压。

在实施过程中,我意识到 flatMap、concatMap 和其他人使用预取,似乎无法禁用此行为。

在没有flatMap的情况下在订阅者中使用需求

Flux.defer(() -> Flux.range(1, 1000))
            .doOnRequest(i -> System.out.println("Requested: " + i))
            .doOnNext(v -> System.out.println("Emitted:   " + v))
            //.flatMap(Mono::just)
            .subscribe(new BaseSubscriber<Object>() {
                protected void hookOnSubscribe(final Subscription subscription) {
                    subscription.request(3);
                }

                protected void hookOnNext(final Object value) {
                    System.out.println("Received:  " + value);
                }
            });

.. 产生:

Requested: 3
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3

对 flatMap 使用相同的需求(未注释)产生:

Requested: 256
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3
Emitted:   4
Emitted:   5
...
Emitted:   254
Emitted:   255
Emitted:   256

似乎有一个未解决的问题:https://github.com/reactor/reactor-core/issues/1397

无论如何,我找到了适合我的情况的解决方案:block()。请记住,此操作仅允许在未标记为 "non-blocking operations only" 的线程上进行。 (另请参阅 Project Blockhound

总而言之,问题是在某些时候我有一个 Flux<Mono<T>>.flatMap(...).concatMap(...) 等使用某种急切获取。用于测试的Flux<Mono<T>>

final Flux<Mono<Integer>> monoFlux = Flux.<Mono<Integer>, Integer>generate(
() -> 0, 
(state, sink) -> {
    state += 1;
    sink.next(Mono.just(state));
    return state;
}).doOnRequest(i -> System.out.println("Requested: " + i))
  .doOnNext(v -> System.out.println("Emitted:   " + v));

为了不急于获取,我现在在地图中做了一个块,效果出奇的好:

monoFlux.map(Mono::block)
        .subscribe(new MySubscriber<>());

结果:

Requested: 3
Emitted:   MonoJust
Received:  1
Emitted:   MonoJust
Received:  2
Emitted:   MonoJust
Received:  3