Spring Webflux / Reactor:reactor-http-nio 与 boundedElastic

Spring Webflux / Reactor: reactor-http-nio vs boundedElastic

我对每个请求落在什么线程模型上的某些反应器概念有点困惑。我读了 https://projectreactor.io/docs/core/release/reference 但还是不清楚。让我们看一个例子:

@Autowired
UserRepository userRepository;

@GetMapping
Flux<User> findAll() {
    log.info("findAll request arrived");
    final Flux<User> defer = Flux.defer(() -> {
          return Flux.fromIterable(userRepository.findAll());
    });
    return defer;
}

日志:[boundedElastic-4] - 信息 - findAll 请求到达

GET 方法在 Schedulers.boundedElastic 线程池中执行(I/O-bound 按照文档工作)

@PostMapping
Mono<User> save(@RequestBody User user) {
    log.info("save request arrived");
    final Mono<User> newUser = Mono.fromCallable(() -> {
         final User userMono = userRepository.save(user);
          log.info("user saved!");
          return userMono;
    });
    return newUser.subscribeOn(Schedulers.boundedElastic());
}

日志:[reactor-http-nio-6] - 信息 - 保存请求到达

一个POST方法落在了http-nio线程池上。

@PostMapping("/test")
Mono<User> test() {
    log.info("test");
    return Mono.just(new User());
}

没有body的POST也落在Schedulers.boundedElastic.

@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
    NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
    final ReactorResourceFactory reactorResourceFactory = new ReactorResourceFactory();
    reactorResourceFactory.setLoopResources(LoopResources.create("http-nio", 4, true));
    reactorResourceFactory.setUseGlobalResources(false);
    factory.setResourceFactory(reactorResourceFactory);
    factory.setPort(8080);
    return factory;
}

这就是我如何配置 http-nio 线程池。

所以,我的问题是:

  1. 为什么 POST 方法 body 被 http-nio 线程池处理?
  2. 这个http-nio线程池应该是一个较小的线程池,所以为什么POST方法和body(我认为被认为是阻塞代码)落在他们身上?
  3. return newUser.subscribeOn(Schedulers.boundedElastic()); 有意义,还是应该保持在同一个线程上?
  1. 因为 I/O 对数据库或其他服务的 reading/saving 操作应该发生在不同的线程池上。如果您的存储库是反应式的,那么您可以看到它在不同的池上运行,将 http-nio 线程返回到池中。 WebClient 也是如此。如果您使用的是包含在 Reactor API 中的阻塞代码,那么您必须确保将 运行 放在不同的线程池中。
  2. 视情况而定。据我所知,您的存储库不是反应性的,因为它 returns User 而不是 Mono<User。然后使用不同的线程池是有意义的,这样你就不会阻塞 http-nio 线程。但是,为了确保执行线程将被切换,您必须使用 flatMap,因此代码如下所示:
    @PostMapping
    Mono<User> save(@RequestBody User user) {
        log.info("save request arrived");
        return Mono.just(user)
                .flatMap(user -> saveUser(user));
    }
    
    private Mono<User> saveUser(User user) {
        return Mono.fromCallable(() -> {
            final User userMono = userRepository.save(user);
            log.info("user saved!");
            return userMono;
        }).subscribeOn(Schedulers.boundedElastic());
    }

此外,最好使用由您可以控制和监视的线程池支持的调度程序。恕我直言,一个经验法则是为每个资源使用一个专用线程池。因此,例如 1 个用于 Postgres DB 的线程池,1 个用于 google API(REST 调用)的线程池和 1 个用于 GitHub API 的线程池。为什么?如果这些资源中的任何一个出现问题(例如,它将在一定时间内不可用),那么其他线程池上的代码路径 运行 将不会被阻塞,并且您的应用程序将运行,至少对于某些代码路径而言.