Flux 在 'then' 之前不等待元素完成
Flux does not wait for completion of elements before 'then'
我无法理解这个问题,我不确定我做错了什么
我想等待 flux 结束,然后 return serverResponse 的单声道
我附上了代码片段,doOnNext 将填充 categoryIdToPrintRepository
我查看了如何在 flux 结束后 return mono 并找到了 'then' 但 'then' 方法仍然在处理 onNextSite 之前执行,这结果错误:
java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry
我做错了什么?
public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored) {
return Mono.just("start").flatMap(id ->
Flux.fromIterable(appSettings.getSites())
.subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
.doOnNext(this::onNextSite)
.then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));
}
private void onNextSite(Integer siteId) {
IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId -> {
Optional<SiteCatalogCategoryDTO> cacheData =
siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> {/*do nothing already exist in cache*/},
() -> {
Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
.get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
});
});
}
private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
int siteId, int catalogId) {
if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
.subscribe(mapSCC -> {
categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
"Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + "\"" +
mapSCC.getCategoryName() + "\" (" + mapSCC.getCategoryId() + ")");
siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
});
}
你在你的应用程序中做了一些你不应该 subscribe
的错误,并且你有无效的方法,除非在特定的地方,否则不应在反应式编程中使用。
这是一些示例代码:
// Nothing will happen, we are not returning anything, we can't subscribe
private void doSomething() {
Mono.just("Foo");
}
// complier error
doSomething().subscribe( ... );
您的应用程序是 publisher
调用客户端,是订阅者,这就是为什么我们 return 将 Mono 或 Flux 输出到调用客户端,它们 subscribe
.
你是这样解决的:
private void doSomething() {
Mono.just("Foo").subscribe( ... );
}
doSomething();
现在你订阅自己来获取东西运行,这是不正确的方式,之前说过,调用客户端是订阅者,而不是你。
正确方法:
private Mono<String> doSomething() {
return Mono.just("Foo");
}
// This is returned out to the calling client, they subscribe
return doSomething();
当一个Mono/Flux完成时,它会发出一个信号,这个信号会触发链中的下一个、下一个、下一个。
所以我对你需要做的事情的看法如下:
- 删除所有
subscribes
,如果你想做的事情有函数,flatmap
,map
,doOnSuccess
等等。保持链条完好无损,一直到客户端。
- 删除所有 void 函数,确保它们 return 一个
Flux
或一个 Mono
如果你想 not return something return a Mono<Void>
通过使用 Mono.empty()
函数,这样链就会完整。
一旦你使用 Mono/Flux 你需要处理 return 以便其他人可以继续。
更新:
为了then
触发,你必须return一些东西,当前一个mono/flux完成时它会return。
示例:
private Flux<String> doSomething() {
return Flux.just("Foo", "Bar", "FooBar")
.doOnNext(string -> {
return // return something
});
}
// Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
return doSomething().then( ... );
我无法理解这个问题,我不确定我做错了什么
我想等待 flux 结束,然后 return serverResponse 的单声道
我附上了代码片段,doOnNext 将填充 categoryIdToPrintRepository
我查看了如何在 flux 结束后 return mono 并找到了 'then' 但 'then' 方法仍然在处理 onNextSite 之前执行,这结果错误:
java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry
我做错了什么?
public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored) {
return Mono.just("start").flatMap(id ->
Flux.fromIterable(appSettings.getSites())
.subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
.doOnNext(this::onNextSite)
.then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));
}
private void onNextSite(Integer siteId) {
IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId -> {
Optional<SiteCatalogCategoryDTO> cacheData =
siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> {/*do nothing already exist in cache*/},
() -> {
Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
.get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
});
});
}
private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
int siteId, int catalogId) {
if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
.subscribe(mapSCC -> {
categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
"Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + "\"" +
mapSCC.getCategoryName() + "\" (" + mapSCC.getCategoryId() + ")");
siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
});
}
你在你的应用程序中做了一些你不应该 subscribe
的错误,并且你有无效的方法,除非在特定的地方,否则不应在反应式编程中使用。
这是一些示例代码:
// Nothing will happen, we are not returning anything, we can't subscribe
private void doSomething() {
Mono.just("Foo");
}
// complier error
doSomething().subscribe( ... );
您的应用程序是 publisher
调用客户端,是订阅者,这就是为什么我们 return 将 Mono 或 Flux 输出到调用客户端,它们 subscribe
.
你是这样解决的:
private void doSomething() {
Mono.just("Foo").subscribe( ... );
}
doSomething();
现在你订阅自己来获取东西运行,这是不正确的方式,之前说过,调用客户端是订阅者,而不是你。
正确方法:
private Mono<String> doSomething() {
return Mono.just("Foo");
}
// This is returned out to the calling client, they subscribe
return doSomething();
当一个Mono/Flux完成时,它会发出一个信号,这个信号会触发链中的下一个、下一个、下一个。
所以我对你需要做的事情的看法如下:
- 删除所有
subscribes
,如果你想做的事情有函数,flatmap
,map
,doOnSuccess
等等。保持链条完好无损,一直到客户端。 - 删除所有 void 函数,确保它们 return 一个
Flux
或一个Mono
如果你想 not return something return aMono<Void>
通过使用Mono.empty()
函数,这样链就会完整。
一旦你使用 Mono/Flux 你需要处理 return 以便其他人可以继续。
更新:
为了then
触发,你必须return一些东西,当前一个mono/flux完成时它会return。
示例:
private Flux<String> doSomething() {
return Flux.just("Foo", "Bar", "FooBar")
.doOnNext(string -> {
return // return something
});
}
// Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
return doSomething().then( ... );