如何自动订阅inner Flux/Mono?
How to subscribe to inner Flux/Mono automatically?
我有一个(有界)Flux 的 Flux,我想将其转换为 Long 的 Flux,其中 Long 是内部 Flux 的大小:
Flux.just( Flux.just(1, 2, 3), Flux.just(1, 2) )
.map(Flux::count)
.log()
.subscribe();
执行日志如下:
onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
request(unbounded)
onNext({ "operator" : "Count" })
onNext({ "operator" : "Count" })
onComplete()
Flux::count returns 一个 Mono,不是一个 Long。订阅main flux时有没有算子自动解压这个inner mono?
flatMap()
适合你:
Transform the elements emitted by this Flux
asynchronously into Publisher
s, then flatten these inner publishers into a single Flux
through merging, which allow them to interleave.
https://projectreactor.io/docs/core/snapshot/api/reactor/core/publisher/Flux.html#flatMap
我有一个(有界)Flux 的 Flux,我想将其转换为 Long 的 Flux,其中 Long 是内部 Flux 的大小:
Flux.just( Flux.just(1, 2, 3), Flux.just(1, 2) )
.map(Flux::count)
.log()
.subscribe();
执行日志如下:
onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
request(unbounded)
onNext({ "operator" : "Count" })
onNext({ "operator" : "Count" })
onComplete()
Flux::count returns 一个 Mono,不是一个 Long。订阅main flux时有没有算子自动解压这个inner mono?
flatMap()
适合你:
Transform the elements emitted by this
Flux
asynchronously intoPublisher
s, then flatten these inner publishers into a singleFlux
through merging, which allow them to interleave.
https://projectreactor.io/docs/core/snapshot/api/reactor/core/publisher/Flux.html#flatMap