在 Spring Webflux 中执行阻塞 JDBC 调用

Execute blocking JDBC call in Spring Webflux

我正在使用 Spring Webflux 和 Spring 数据 jpa,使用 PostgreSql 作为后端数据库。 我不想在进行 findsave 等数据库调用时阻塞主线程。 为了达到同样的目的,我在 Controller class 中有一个主调度程序和一个 jdbcScheduler 服务 classes.

我定义它们的方式是:

@Configuration
@EnableJpaAuditing
public class CommonConfig {

    @Value("${spring.datasource.hikari.maximum-pool-size}")
    int connectionPoolSize;

    @Bean
    public Scheduler scheduler() {
        return Schedulers.parallel();
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
        return new TransactionTemplate(transactionManager);
    }
}

现在,在我的服务层中进行 get/save 调用时,我会:

    @Override
    public Mono<Config> getConfigByKey(String key) {
        return Mono.defer(
            () -> Mono.justOrEmpty(configRepository.findByKey(key)))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
        return Flux
            .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> addConfigs(List<Config> configList) {
        return Flux.fromIterable(configRepository.saveAll(configList))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

在控制器中,我这样做:

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
        return configService.addConfigs(configs).collectList()
            .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
            .subscribeOn(scheduler);
    }

这是正确的吗? and/or有更好的方法吗?

我的理解是:

.subscribeOn(jdbcScheduler)
.publishOn(scheduler);

是该任务将 运行 在 jdbcScheduler 线程上,稍后结果将在我的主并行 scheduler 上发布。这个理解对吗?

您对 publishOnsubscribeOn (see reference documentation in the reactor project about those operators) 的理解是正确的。

如果您调用阻塞库而不在特定调度程序上调度工作,这些调用将阻塞为数不多的可用线程之一(默认情况下为 Netty 事件循环),您的应用程序将只能处理少数请求同时

现在我不确定你这样做的目的是什么。

首先,parallel scheduler is designed for CPU bound tasks,这意味着它们的数量将很少,与 CPU 个核心一样多(或多一点)。在这种情况下,这就像将线程池大小设置为常规 Servlet 容器上的核心数。您的应用将无法处理大量并发请求。

即使您选择更好的替代方案(如弹性调度程序),它仍然不如 Netty 事件循环好,后者是在 Spring WebFlux 中本地安排请求处理的地方。

如果您的最终目标是性能和可伸缩性,那么在反应式应用程序中包装阻塞调用的性能可能比常规 Servlet 容器更差。

您可以改为使用 Spring MVC 和:

  • 在处理阻塞库时使用通常的阻塞 return 类型,例如 JPA
  • 当您不依赖于此类库时,请使用 MonoFlux return 类型

这不是非阻塞的,但仍然是异步的,您将能够并行执行更多工作而无需处理复杂性。

恕我直言,有一种方法可以执行此操作,从而更好地利用机器资源。按照文档,您可以 wrap the call 在其他线程中,并且您可以继续执行。