在项目反应器中包装阻塞 I/O

Wrapping blocking I/O in project reactor

我有一个 spring-webflux API,它在服务层需要从使用 JDBC.

的现有存储库中读取

阅读了一些有关该主题的文章后,我想将阻塞数据库调用的执行与其余的非阻塞异步代码分开。

我已经定义了一个专用的jdbc调度器:

@Bean
public Scheduler jdbcScheduler() {
    return Schedulers.fromExecutor(Executors.newFixedThreadPool(maxPoolSize));
}

以及使用它的 AsyncWrapper 实用程序:

@Component
public class AsyncJdbcWrapper {

    private final Scheduler jdbcScheduler;

    @Autowired
    public AsyncJdbcWrapper(Scheduler jdbcScheduler) {

        this.jdbcScheduler = jdbcScheduler;
    }

    public <T> Mono<T> async(Callable<T> callable) {
        return Mono.fromCallable(callable)
                .subscribeOn(jdbcScheduler)
                .publishOn(Schedulers.parallel());
    }
}

然后用于包装 jdbc 调用,如下所示:

Mono<Integer> userIdMono = asyncWrapper.async(() -> userDao.getUserByUUID(request.getUserId()))
                .map(userOption -> userOption.map(u -> u.getId())
                        .orElseThrow(() -> new IllegalArgumentException("Unable to find user with ID " + request.getUserId())));

我有两个问题:

1) 我是否正确地将阻塞调用的执行推送到另一组线程?作为对这些东西的新手,我正在为 subscribeOn()/publishOn() 的复杂性而苦苦挣扎。

2) 假设我想使用生成的单声道,例如用 userIdMono 的结果调用 API,它将在哪个调度程序上执行?是专门为 jdbc 调用创建的线程,还是反应堆通常在其中运行的主(?)线程?例如

userIdMono.map(id -> someApiClient.call(id));

1) 使用 subscribeOn 正确地将 JDBC 工作放在 jdbcScheduler

2) Callable 的结果——在 jdbcScheduler 上计算时,publishOnparallel 调度器,所以你的 map 将在来自 Schedulers.parallel() 池的线程(而不是占用 jdbcScheduler)。