如何等待订阅完成?
How to wait for subscribe to finish?
我想使用 spring webclient 进行异步休息调用并取回 Mono。我也在并行进行一些数据库调用,但由于某些原因无法被动完成。
Map<String, Object> models = new HashMap<>();
Mono<User> users = this.webClient...;
users.map(resp -> new UserState(userRequest, resp))
.subscribe(response -> {
models.put("userState", response);
});
Iterable<Product> messages = this.productRepository.findAll();
models.put("products", messages);
//Wait for users.subscribe to finish <<<<<<<<<<<<<HERE
return new ModelAndView("messages/list", models);
如何在返回 ModelAndView 之前等待订阅完成。如果我使用 Future
,我可以随时执行 get()
,这会很容易。
您可以将阻塞调用包装在一个 Mono
中,在单独的调度程序上执行,将其与包含 UserState
数据的 Mono
压缩并将它们的组合转换为 Mono<ModelAndView>
(可以从 Spring 控制器方法返回)。调用将并行执行,两个调用完成后将合并结果。
您可以为每个应用程序定义一个专门用于阻塞调用的有界调度程序,并将其作为构造函数参数提供给任何进行阻塞调用的class。
代码如下所示:
@Configuration
class SchedulersConfig {
@Bean
Scheduler parallelScheduler(@Value("${blocking-thread-pool-size}") int threadsCount) {
return Schedulers.parallel(threadsCount);
}
}
@RestController
class Controller {
final Scheduler parallelScheduler;
...
Mono<User> userResponse = // webClient...
Mono<Iterable<Product>> productsResponse = Mono.fromSupplier(productRepository::findAll)
.subscribeOn(parallelScheduler);
return Mono.zip(userResponse, productsResponse, (user, products) ->
new ModelAndView("messages/list",
ImmutableMap.of(
"userState", new UserState(userRequest, user),
"products", products
))
);
}
根据评论更新:
如果您只需要异步执行 HTTP 调用,然后将其与数据库结果结合起来,您可以执行以下操作
Map<String, Object> models = new HashMap<>();
Mono<User> userMono = webClient...;
CompletableFuture<User> userFuture = userMono.toFuture();
Iterable<Product> messages = productRepository.findAll();
User user = userFuture.join();
models.put("products", messages);
models.put("userState", new UserState(userRequest, user));
return new ModelAndView("messages/list", models);
我想使用 spring webclient 进行异步休息调用并取回 Mono。我也在并行进行一些数据库调用,但由于某些原因无法被动完成。
Map<String, Object> models = new HashMap<>();
Mono<User> users = this.webClient...;
users.map(resp -> new UserState(userRequest, resp))
.subscribe(response -> {
models.put("userState", response);
});
Iterable<Product> messages = this.productRepository.findAll();
models.put("products", messages);
//Wait for users.subscribe to finish <<<<<<<<<<<<<HERE
return new ModelAndView("messages/list", models);
如何在返回 ModelAndView 之前等待订阅完成。如果我使用 Future
,我可以随时执行 get()
,这会很容易。
您可以将阻塞调用包装在一个 Mono
中,在单独的调度程序上执行,将其与包含 UserState
数据的 Mono
压缩并将它们的组合转换为 Mono<ModelAndView>
(可以从 Spring 控制器方法返回)。调用将并行执行,两个调用完成后将合并结果。
您可以为每个应用程序定义一个专门用于阻塞调用的有界调度程序,并将其作为构造函数参数提供给任何进行阻塞调用的class。
代码如下所示:
@Configuration
class SchedulersConfig {
@Bean
Scheduler parallelScheduler(@Value("${blocking-thread-pool-size}") int threadsCount) {
return Schedulers.parallel(threadsCount);
}
}
@RestController
class Controller {
final Scheduler parallelScheduler;
...
Mono<User> userResponse = // webClient...
Mono<Iterable<Product>> productsResponse = Mono.fromSupplier(productRepository::findAll)
.subscribeOn(parallelScheduler);
return Mono.zip(userResponse, productsResponse, (user, products) ->
new ModelAndView("messages/list",
ImmutableMap.of(
"userState", new UserState(userRequest, user),
"products", products
))
);
}
根据评论更新:
如果您只需要异步执行 HTTP 调用,然后将其与数据库结果结合起来,您可以执行以下操作
Map<String, Object> models = new HashMap<>();
Mono<User> userMono = webClient...;
CompletableFuture<User> userFuture = userMono.toFuture();
Iterable<Product> messages = productRepository.findAll();
User user = userFuture.join();
models.put("products", messages);
models.put("userState", new UserState(userRequest, user));
return new ModelAndView("messages/list", models);