如何停止主线程以完成所有 Mono 调用?
How to stop main thread to complete all Mono calls?
我正在对 DB.And 进行多次单声道调用,需要所有单声道响应的结果来计算在声明的单声道逻辑之后写入的最终结果。
if (SomeObject.getAccountLevelActiveList() != null) {
SomeObject.getAccountLevelActiveList().parallelStream().forEach(account -> {
Mono<SubLine> subLineMono= SubLineService
.getLineLevelCustProfile(preNbsLineLevelConverter.getSubLine(account ));
subLineMono.subscribe(subLine-> PollObject.getSubList()
.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine)));
});
}
但我的主要逻辑是在单声道结果存储到 PollObject 之前执行。所以我在 PollObject 中得到 null。所以我想停止我的主线程,直到 Mono 结果存储到 PollObject.
如果你想停止主线程那么你可以使用阻塞而不是订阅,但是你必须首先将 List
转换成 Flux
然后 flatMap
-it 使用提供的 Mono
。 subscribe
方法中的逻辑可以移动到副作用运算符 doOnNext
中 Mono
或包装 Flux
:
Flux.fromIterable(SomeObject.getAccountLevelActiveList())
.flatMap(account ->
SubLineService.getLineLevelCustProfile(
preNbsLineLevelConverter.getSubLine( account ))
).doOnNext(subLine ->
PollObject.getSubList().put(accountLevelMtn.getMtn(),
Optional.ofNullable(subLine))
).blockLast();
// the following code will be executed first when all monos are completed
如果主线程中 运行 不需要 if
之后的代码,最好保持反应性,正如 @chrylis-cautiouslyoptimistic 已经建议的那样。使用 reduce
运算符将所有结果放在一起,生成在所有提供的单声道完成时完成的单声道:
Flux.fromIterable(SomeObject.getAccountLevelActiveList())
.flatMap(account ->
SubLineService.getLineLevelCustProfile(
preNbsLineLevelConverter.getSubLine( account ))
).reduce(PollObject.getSubList(), (subList, subLine) ->
sublist.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine))
).map(subList -> {
// the code here will be executed first when all monos are completed
})
// ... other operators if necessary
// eventually subscribing or returning the mono for further processing
.subscribe();
我正在对 DB.And 进行多次单声道调用,需要所有单声道响应的结果来计算在声明的单声道逻辑之后写入的最终结果。
if (SomeObject.getAccountLevelActiveList() != null) {
SomeObject.getAccountLevelActiveList().parallelStream().forEach(account -> {
Mono<SubLine> subLineMono= SubLineService
.getLineLevelCustProfile(preNbsLineLevelConverter.getSubLine(account ));
subLineMono.subscribe(subLine-> PollObject.getSubList()
.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine)));
});
}
但我的主要逻辑是在单声道结果存储到 PollObject 之前执行。所以我在 PollObject 中得到 null。所以我想停止我的主线程,直到 Mono 结果存储到 PollObject.
如果你想停止主线程那么你可以使用阻塞而不是订阅,但是你必须首先将 List
转换成 Flux
然后 flatMap
-it 使用提供的 Mono
。 subscribe
方法中的逻辑可以移动到副作用运算符 doOnNext
中 Mono
或包装 Flux
:
Flux.fromIterable(SomeObject.getAccountLevelActiveList())
.flatMap(account ->
SubLineService.getLineLevelCustProfile(
preNbsLineLevelConverter.getSubLine( account ))
).doOnNext(subLine ->
PollObject.getSubList().put(accountLevelMtn.getMtn(),
Optional.ofNullable(subLine))
).blockLast();
// the following code will be executed first when all monos are completed
如果主线程中 运行 不需要 if
之后的代码,最好保持反应性,正如 @chrylis-cautiouslyoptimistic 已经建议的那样。使用 reduce
运算符将所有结果放在一起,生成在所有提供的单声道完成时完成的单声道:
Flux.fromIterable(SomeObject.getAccountLevelActiveList())
.flatMap(account ->
SubLineService.getLineLevelCustProfile(
preNbsLineLevelConverter.getSubLine( account ))
).reduce(PollObject.getSubList(), (subList, subLine) ->
sublist.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine))
).map(subList -> {
// the code here will be executed first when all monos are completed
})
// ... other operators if necessary
// eventually subscribing or returning the mono for further processing
.subscribe();