Java Reactor:有没有一种方法可以将 Flux<Mono<T>> 转换为 Flux<T> 而无需急切获取?
Java Reactor: Is there a way to transform Flux<Mono<T>> into Flux<T> without eager fetching?
我有一个快速但昂贵的生产者(Spring WebClient)和一个非常慢的订阅者。我需要一种方法来处理整个链中的背压。
在实施过程中,我意识到 flatMap、concatMap 和其他人使用预取,似乎无法禁用此行为。
在没有flatMap的情况下在订阅者中使用需求
Flux.defer(() -> Flux.range(1, 1000))
.doOnRequest(i -> System.out.println("Requested: " + i))
.doOnNext(v -> System.out.println("Emitted: " + v))
//.flatMap(Mono::just)
.subscribe(new BaseSubscriber<Object>() {
protected void hookOnSubscribe(final Subscription subscription) {
subscription.request(3);
}
protected void hookOnNext(final Object value) {
System.out.println("Received: " + value);
}
});
.. 产生:
Requested: 3
Emitted: 1
Received: 1
Emitted: 2
Received: 2
Emitted: 3
Received: 3
对 flatMap 使用相同的需求(未注释)产生:
Requested: 256
Emitted: 1
Received: 1
Emitted: 2
Received: 2
Emitted: 3
Received: 3
Emitted: 4
Emitted: 5
...
Emitted: 254
Emitted: 255
Emitted: 256
似乎有一个未解决的问题:https://github.com/reactor/reactor-core/issues/1397
无论如何,我找到了适合我的情况的解决方案:block()
。请记住,此操作仅允许在未标记为 "non-blocking operations only" 的线程上进行。 (另请参阅 Project Blockhound)
总而言之,问题是在某些时候我有一个 Flux<Mono<T>>
和 .flatMap(...)
、.concatMap(...)
等使用某种急切获取。用于测试的Flux<Mono<T>>
:
final Flux<Mono<Integer>> monoFlux = Flux.<Mono<Integer>, Integer>generate(
() -> 0,
(state, sink) -> {
state += 1;
sink.next(Mono.just(state));
return state;
}).doOnRequest(i -> System.out.println("Requested: " + i))
.doOnNext(v -> System.out.println("Emitted: " + v));
为了不急于获取,我现在在地图中做了一个块,效果出奇的好:
monoFlux.map(Mono::block)
.subscribe(new MySubscriber<>());
结果:
Requested: 3
Emitted: MonoJust
Received: 1
Emitted: MonoJust
Received: 2
Emitted: MonoJust
Received: 3
我有一个快速但昂贵的生产者(Spring WebClient)和一个非常慢的订阅者。我需要一种方法来处理整个链中的背压。
在实施过程中,我意识到 flatMap、concatMap 和其他人使用预取,似乎无法禁用此行为。
在没有flatMap的情况下在订阅者中使用需求
Flux.defer(() -> Flux.range(1, 1000))
.doOnRequest(i -> System.out.println("Requested: " + i))
.doOnNext(v -> System.out.println("Emitted: " + v))
//.flatMap(Mono::just)
.subscribe(new BaseSubscriber<Object>() {
protected void hookOnSubscribe(final Subscription subscription) {
subscription.request(3);
}
protected void hookOnNext(final Object value) {
System.out.println("Received: " + value);
}
});
.. 产生:
Requested: 3
Emitted: 1
Received: 1
Emitted: 2
Received: 2
Emitted: 3
Received: 3
对 flatMap 使用相同的需求(未注释)产生:
Requested: 256
Emitted: 1
Received: 1
Emitted: 2
Received: 2
Emitted: 3
Received: 3
Emitted: 4
Emitted: 5
...
Emitted: 254
Emitted: 255
Emitted: 256
似乎有一个未解决的问题:https://github.com/reactor/reactor-core/issues/1397
无论如何,我找到了适合我的情况的解决方案:block()
。请记住,此操作仅允许在未标记为 "non-blocking operations only" 的线程上进行。 (另请参阅 Project Blockhound)
总而言之,问题是在某些时候我有一个 Flux<Mono<T>>
和 .flatMap(...)
、.concatMap(...)
等使用某种急切获取。用于测试的Flux<Mono<T>>
:
final Flux<Mono<Integer>> monoFlux = Flux.<Mono<Integer>, Integer>generate(
() -> 0,
(state, sink) -> {
state += 1;
sink.next(Mono.just(state));
return state;
}).doOnRequest(i -> System.out.println("Requested: " + i))
.doOnNext(v -> System.out.println("Emitted: " + v));
为了不急于获取,我现在在地图中做了一个块,效果出奇的好:
monoFlux.map(Mono::block)
.subscribe(new MySubscriber<>());
结果:
Requested: 3
Emitted: MonoJust
Received: 1
Emitted: MonoJust
Received: 2
Emitted: MonoJust
Received: 3