Spring Webflux / Reactor:reactor-http-nio 与 boundedElastic
Spring Webflux / Reactor: reactor-http-nio vs boundedElastic
我对每个请求落在什么线程模型上的某些反应器概念有点困惑。我读了 https://projectreactor.io/docs/core/release/reference 但还是不清楚。让我们看一个例子:
@Autowired
UserRepository userRepository;
@GetMapping
Flux<User> findAll() {
log.info("findAll request arrived");
final Flux<User> defer = Flux.defer(() -> {
return Flux.fromIterable(userRepository.findAll());
});
return defer;
}
日志:[boundedElastic-4] - 信息 - findAll 请求到达
GET 方法在 Schedulers.boundedElastic 线程池中执行(I/O-bound 按照文档工作)
@PostMapping
Mono<User> save(@RequestBody User user) {
log.info("save request arrived");
final Mono<User> newUser = Mono.fromCallable(() -> {
final User userMono = userRepository.save(user);
log.info("user saved!");
return userMono;
});
return newUser.subscribeOn(Schedulers.boundedElastic());
}
日志:[reactor-http-nio-6] - 信息 - 保存请求到达
一个POST方法落在了http-nio线程池上。
@PostMapping("/test")
Mono<User> test() {
log.info("test");
return Mono.just(new User());
}
没有body的POST也落在Schedulers.boundedElastic.
@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
final ReactorResourceFactory reactorResourceFactory = new ReactorResourceFactory();
reactorResourceFactory.setLoopResources(LoopResources.create("http-nio", 4, true));
reactorResourceFactory.setUseGlobalResources(false);
factory.setResourceFactory(reactorResourceFactory);
factory.setPort(8080);
return factory;
}
这就是我如何配置 http-nio 线程池。
所以,我的问题是:
- 为什么 POST 方法 body 被 http-nio 线程池处理?
- 这个http-nio线程池应该是一个较小的线程池,所以为什么POST方法和body(我认为被认为是阻塞代码)落在他们身上?
- 对 return newUser.subscribeOn(Schedulers.boundedElastic()); 有意义,还是应该保持在同一个线程上?
- 因为 I/O 对数据库或其他服务的 reading/saving 操作应该发生在不同的线程池上。如果您的存储库是反应式的,那么您可以看到它在不同的池上运行,将
http-nio
线程返回到池中。 WebClient
也是如此。如果您使用的是包含在 Reactor API 中的阻塞代码,那么您必须确保将 运行 放在不同的线程池中。
- 视情况而定。据我所知,您的存储库不是反应性的,因为它 returns
User
而不是 Mono<User
。然后使用不同的线程池是有意义的,这样你就不会阻塞 http-nio
线程。但是,为了确保执行线程将被切换,您必须使用 flatMap,因此代码如下所示:
@PostMapping
Mono<User> save(@RequestBody User user) {
log.info("save request arrived");
return Mono.just(user)
.flatMap(user -> saveUser(user));
}
private Mono<User> saveUser(User user) {
return Mono.fromCallable(() -> {
final User userMono = userRepository.save(user);
log.info("user saved!");
return userMono;
}).subscribeOn(Schedulers.boundedElastic());
}
此外,最好使用由您可以控制和监视的线程池支持的调度程序。恕我直言,一个经验法则是为每个资源使用一个专用线程池。因此,例如 1 个用于 Postgres DB 的线程池,1 个用于 google API(REST 调用)的线程池和 1 个用于 GitHub API 的线程池。为什么?如果这些资源中的任何一个出现问题(例如,它将在一定时间内不可用),那么其他线程池上的代码路径 运行 将不会被阻塞,并且您的应用程序将运行,至少对于某些代码路径而言.
我对每个请求落在什么线程模型上的某些反应器概念有点困惑。我读了 https://projectreactor.io/docs/core/release/reference 但还是不清楚。让我们看一个例子:
@Autowired
UserRepository userRepository;
@GetMapping
Flux<User> findAll() {
log.info("findAll request arrived");
final Flux<User> defer = Flux.defer(() -> {
return Flux.fromIterable(userRepository.findAll());
});
return defer;
}
日志:[boundedElastic-4] - 信息 - findAll 请求到达
GET 方法在 Schedulers.boundedElastic 线程池中执行(I/O-bound 按照文档工作)
@PostMapping
Mono<User> save(@RequestBody User user) {
log.info("save request arrived");
final Mono<User> newUser = Mono.fromCallable(() -> {
final User userMono = userRepository.save(user);
log.info("user saved!");
return userMono;
});
return newUser.subscribeOn(Schedulers.boundedElastic());
}
日志:[reactor-http-nio-6] - 信息 - 保存请求到达
一个POST方法落在了http-nio线程池上。
@PostMapping("/test")
Mono<User> test() {
log.info("test");
return Mono.just(new User());
}
没有body的POST也落在Schedulers.boundedElastic.
@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
final ReactorResourceFactory reactorResourceFactory = new ReactorResourceFactory();
reactorResourceFactory.setLoopResources(LoopResources.create("http-nio", 4, true));
reactorResourceFactory.setUseGlobalResources(false);
factory.setResourceFactory(reactorResourceFactory);
factory.setPort(8080);
return factory;
}
这就是我如何配置 http-nio 线程池。
所以,我的问题是:
- 为什么 POST 方法 body 被 http-nio 线程池处理?
- 这个http-nio线程池应该是一个较小的线程池,所以为什么POST方法和body(我认为被认为是阻塞代码)落在他们身上?
- 对 return newUser.subscribeOn(Schedulers.boundedElastic()); 有意义,还是应该保持在同一个线程上?
- 因为 I/O 对数据库或其他服务的 reading/saving 操作应该发生在不同的线程池上。如果您的存储库是反应式的,那么您可以看到它在不同的池上运行,将
http-nio
线程返回到池中。WebClient
也是如此。如果您使用的是包含在 Reactor API 中的阻塞代码,那么您必须确保将 运行 放在不同的线程池中。 - 视情况而定。据我所知,您的存储库不是反应性的,因为它 returns
User
而不是Mono<User
。然后使用不同的线程池是有意义的,这样你就不会阻塞http-nio
线程。但是,为了确保执行线程将被切换,您必须使用 flatMap,因此代码如下所示:
@PostMapping
Mono<User> save(@RequestBody User user) {
log.info("save request arrived");
return Mono.just(user)
.flatMap(user -> saveUser(user));
}
private Mono<User> saveUser(User user) {
return Mono.fromCallable(() -> {
final User userMono = userRepository.save(user);
log.info("user saved!");
return userMono;
}).subscribeOn(Schedulers.boundedElastic());
}
此外,最好使用由您可以控制和监视的线程池支持的调度程序。恕我直言,一个经验法则是为每个资源使用一个专用线程池。因此,例如 1 个用于 Postgres DB 的线程池,1 个用于 google API(REST 调用)的线程池和 1 个用于 GitHub API 的线程池。为什么?如果这些资源中的任何一个出现问题(例如,它将在一定时间内不可用),那么其他线程池上的代码路径 运行 将不会被阻塞,并且您的应用程序将运行,至少对于某些代码路径而言.