在项目反应器中包装阻塞 I/O
Wrapping blocking I/O in project reactor
我有一个 spring-webflux API,它在服务层需要从使用 JDBC.
的现有存储库中读取
阅读了一些有关该主题的文章后,我想将阻塞数据库调用的执行与其余的非阻塞异步代码分开。
我已经定义了一个专用的jdbc调度器:
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(maxPoolSize));
}
以及使用它的 AsyncWrapper 实用程序:
@Component
public class AsyncJdbcWrapper {
private final Scheduler jdbcScheduler;
@Autowired
public AsyncJdbcWrapper(Scheduler jdbcScheduler) {
this.jdbcScheduler = jdbcScheduler;
}
public <T> Mono<T> async(Callable<T> callable) {
return Mono.fromCallable(callable)
.subscribeOn(jdbcScheduler)
.publishOn(Schedulers.parallel());
}
}
然后用于包装 jdbc 调用,如下所示:
Mono<Integer> userIdMono = asyncWrapper.async(() -> userDao.getUserByUUID(request.getUserId()))
.map(userOption -> userOption.map(u -> u.getId())
.orElseThrow(() -> new IllegalArgumentException("Unable to find user with ID " + request.getUserId())));
我有两个问题:
1) 我是否正确地将阻塞调用的执行推送到另一组线程?作为对这些东西的新手,我正在为 subscribeOn()/publishOn() 的复杂性而苦苦挣扎。
2) 假设我想使用生成的单声道,例如用 userIdMono 的结果调用 API,它将在哪个调度程序上执行?是专门为 jdbc 调用创建的线程,还是反应堆通常在其中运行的主(?)线程?例如
userIdMono.map(id -> someApiClient.call(id));
1) 使用 subscribeOn
正确地将 JDBC 工作放在 jdbcScheduler
上
2) Callable
的结果——在 jdbcScheduler 上计算时,publishOn
是 parallel
调度器,所以你的 map
将在来自 Schedulers.parallel()
池的线程(而不是占用 jdbcScheduler
)。
我有一个 spring-webflux API,它在服务层需要从使用 JDBC.
的现有存储库中读取阅读了一些有关该主题的文章后,我想将阻塞数据库调用的执行与其余的非阻塞异步代码分开。
我已经定义了一个专用的jdbc调度器:
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(maxPoolSize));
}
以及使用它的 AsyncWrapper 实用程序:
@Component
public class AsyncJdbcWrapper {
private final Scheduler jdbcScheduler;
@Autowired
public AsyncJdbcWrapper(Scheduler jdbcScheduler) {
this.jdbcScheduler = jdbcScheduler;
}
public <T> Mono<T> async(Callable<T> callable) {
return Mono.fromCallable(callable)
.subscribeOn(jdbcScheduler)
.publishOn(Schedulers.parallel());
}
}
然后用于包装 jdbc 调用,如下所示:
Mono<Integer> userIdMono = asyncWrapper.async(() -> userDao.getUserByUUID(request.getUserId()))
.map(userOption -> userOption.map(u -> u.getId())
.orElseThrow(() -> new IllegalArgumentException("Unable to find user with ID " + request.getUserId())));
我有两个问题:
1) 我是否正确地将阻塞调用的执行推送到另一组线程?作为对这些东西的新手,我正在为 subscribeOn()/publishOn() 的复杂性而苦苦挣扎。
2) 假设我想使用生成的单声道,例如用 userIdMono 的结果调用 API,它将在哪个调度程序上执行?是专门为 jdbc 调用创建的线程,还是反应堆通常在其中运行的主(?)线程?例如
userIdMono.map(id -> someApiClient.call(id));
1) 使用 subscribeOn
正确地将 JDBC 工作放在 jdbcScheduler
2) Callable
的结果——在 jdbcScheduler 上计算时,publishOn
是 parallel
调度器,所以你的 map
将在来自 Schedulers.parallel()
池的线程(而不是占用 jdbcScheduler
)。