从单声道列表中获取第一个 onNext 信号
Getting first onNext signal from list of Mono
考虑有 3 个函数导致 Mono<Int>
s。我试图获得任何 Monos 发出的第一个结果。这是一个描述我正在寻找的测试:
fun main() {
StepVerifier
.create(firstElement())
.expectSubscription()
.expectNext(3)
.expectComplete()
.verify()
}
fun firstElement(): Mono<Int> = Flux.concat(_1(), _2(), _3(), _4()).next()
fun _1(): Mono<Int> = 1.toMono().delayElement(Duration.ofMillis(1000))
fun _2(): Mono<Int> = Mono.empty()
fun _3(): Mono<Int> = 3.toMono().delayElement(Duration.ofMillis(500))
fun _4(): Mono<Int> = Mono.error(RuntimeException())
问题在 firstElement()
,如何得到 3
因为它是第一个发出元素的。但是,正如您所看到的,从任何一个 Monos:
- 它们中的任何一个都可能比其他发射得更快
- 它们中的任何一个都可能发出空或
onComplete()
- 它们中的任何一个都可能发出错误或
onError()
我试过几个运算符:
Mono.zip {...}
要求全部发射,因为 return 是 Tuple<Int!>
Mono.first(...)
和 Flux.first(...).next()
传输 onComplete()
and/or onError()
Flux.concat(...)
去掉了 onComplete()
和 onError()
但它仍然是根据给定的 Publisher<T>
s 的顺序顺序订阅
您可以使用空 Mono
错误恢复并合并您的函数
private Mono<Integer> firstElement() {
return Flux.merge(
_1().onErrorResume(ignored -> Mono.empty()),
_2().onErrorResume(ignored -> Mono.empty()),
_3().onErrorResume(ignored -> Mono.empty()),
_4().onErrorResume(ignored -> Mono.empty()))
.next();
}
考虑有 3 个函数导致 Mono<Int>
s。我试图获得任何 Monos 发出的第一个结果。这是一个描述我正在寻找的测试:
fun main() {
StepVerifier
.create(firstElement())
.expectSubscription()
.expectNext(3)
.expectComplete()
.verify()
}
fun firstElement(): Mono<Int> = Flux.concat(_1(), _2(), _3(), _4()).next()
fun _1(): Mono<Int> = 1.toMono().delayElement(Duration.ofMillis(1000))
fun _2(): Mono<Int> = Mono.empty()
fun _3(): Mono<Int> = 3.toMono().delayElement(Duration.ofMillis(500))
fun _4(): Mono<Int> = Mono.error(RuntimeException())
问题在 firstElement()
,如何得到 3
因为它是第一个发出元素的。但是,正如您所看到的,从任何一个 Monos:
- 它们中的任何一个都可能比其他发射得更快
- 它们中的任何一个都可能发出空或
onComplete()
- 它们中的任何一个都可能发出错误或
onError()
我试过几个运算符:
Mono.zip {...}
要求全部发射,因为 return 是Tuple<Int!>
Mono.first(...)
和Flux.first(...).next()
传输onComplete()
and/oronError()
Flux.concat(...)
去掉了onComplete()
和onError()
但它仍然是根据给定的Publisher<T>
s 的顺序顺序订阅
您可以使用空 Mono
错误恢复并合并您的函数
private Mono<Integer> firstElement() {
return Flux.merge(
_1().onErrorResume(ignored -> Mono.empty()),
_2().onErrorResume(ignored -> Mono.empty()),
_3().onErrorResume(ignored -> Mono.empty()),
_4().onErrorResume(ignored -> Mono.empty()))
.next();
}