如何使用 netty-reactor 从阻塞调度程序切换回以前的调度程序?
How to switch back from blocking scheduler to previous scheduler using netty-reactor?
如何使用 Spring Webflux + Netty + Reactor 从阻塞调度程序 (blocking-pool) 切换回之前的调度程序 (reactor-http-nio)?
代码:
@RequiredArgsConstructor
@Service
@Slf4j
public class BookService {
private final IBookRepo bookRepo;
private final BlockingPoolConfig blockingPoolConfig;
public Mono<Optional<Book>> getBook(Long id) {
log.debug("getBook() - id: {}", id);
return asyncCallable(() -> {
log.trace("getBook() - invoking bookRepo.findById(id) ...");
return bookRepo.findById(id);
});
}
protected <S> Mono<S> asyncCallable(Callable<S> callable) {
return Mono.fromCallable(callable)
.subscribeOn(blockingPoolConfig.blockingScheduler());
}
}
@RestController
@RequiredArgsConstructor
@Slf4j
public class BookController {
private final BookService bookService;
@GetMapping("/book/{id}")
public Mono<Book> get(@PathVariable Long id) {
log.debug("get() - id: {}", id);
return bookService.getBook(id)
.publishOn(Schedulers.parallel()) //publishOn(... ?)
.map(optionalBook -> {
return optionalBook.map(book -> {
log.debug("get() result: {}", book);
return book;
}).orElseThrow(() -> {
log.debug("book with id: {} is not found.", id);
return new ResponseStatusException(HttpStatus.NOT_FOUND, "Book not found");
});
});
}
@Configuration
@Slf4j
public class BlockingPoolConfig {
@Value("${spring.datasource.maximumPoolSize:8}")
private int connectionPoolSize = 1;
@Scope("singleton")
@Bean
public Scheduler blockingScheduler() {
Scheduler scheduler = Schedulers.newBoundedElastic(connectionPoolSize, connectionPoolSize, "blocking-pool");
return scheduler;
}
}
上面我使用的是 publishOn(Schedulers.parallel())
,但是这个创建了新的线程池(并行)。相反,我更喜欢切换 reactor-http-nio 线程池。
实际结果日志:
19:17:45.290 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() - id: 1
19:17:45.291 [reactor-http-nio-2 ] DEBUG t.a.p.service.BookService - getBook() - id: 1
19:17:45.316 [blocking-pool-1 ] TRACE t.a.p.service.BookService - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [parallel-2 ] DEBUG t.a.p.controller.BookController - get() result: Book(id=1, title=Abc)
预期结果日志:
19:17:45.290 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() - id: 1
19:17:45.291 [reactor-http-nio-2 ] DEBUG t.a.p.service.BookService - getBook() - id: 1
19:17:45.316 [blocking-pool-1 ] TRACE t.a.p.service.BookService - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() result: Book(id=1, title=Abc)
目前这是不可能的,因为 A) 这些 HTTP 线程不是由 Reactor Scheduler
控制的,而是由底层的 Netty 事件循环本身控制的,并且 B) [=21= 中没有通用的方法] 到“return 执行一个(任意)线程”,如果该线程没有与之关联的 Executor
/ExecutorService
。
对于 reactor-netty,一旦您退出了 HTTP 线程,就没有理由再切换回 Netty 线程了。一旦发送响应,reactor-netty 将自然完成。
假设阻塞池类似于 Schedulers.boundedElastic()
,您可能确实想转到 Schedulers.parallel()
来限制阻塞线程的寿命,这是一个完美的解决方案。
如何使用 Spring Webflux + Netty + Reactor 从阻塞调度程序 (blocking-pool) 切换回之前的调度程序 (reactor-http-nio)?
代码:
@RequiredArgsConstructor
@Service
@Slf4j
public class BookService {
private final IBookRepo bookRepo;
private final BlockingPoolConfig blockingPoolConfig;
public Mono<Optional<Book>> getBook(Long id) {
log.debug("getBook() - id: {}", id);
return asyncCallable(() -> {
log.trace("getBook() - invoking bookRepo.findById(id) ...");
return bookRepo.findById(id);
});
}
protected <S> Mono<S> asyncCallable(Callable<S> callable) {
return Mono.fromCallable(callable)
.subscribeOn(blockingPoolConfig.blockingScheduler());
}
}
@RestController
@RequiredArgsConstructor
@Slf4j
public class BookController {
private final BookService bookService;
@GetMapping("/book/{id}")
public Mono<Book> get(@PathVariable Long id) {
log.debug("get() - id: {}", id);
return bookService.getBook(id)
.publishOn(Schedulers.parallel()) //publishOn(... ?)
.map(optionalBook -> {
return optionalBook.map(book -> {
log.debug("get() result: {}", book);
return book;
}).orElseThrow(() -> {
log.debug("book with id: {} is not found.", id);
return new ResponseStatusException(HttpStatus.NOT_FOUND, "Book not found");
});
});
}
@Configuration
@Slf4j
public class BlockingPoolConfig {
@Value("${spring.datasource.maximumPoolSize:8}")
private int connectionPoolSize = 1;
@Scope("singleton")
@Bean
public Scheduler blockingScheduler() {
Scheduler scheduler = Schedulers.newBoundedElastic(connectionPoolSize, connectionPoolSize, "blocking-pool");
return scheduler;
}
}
上面我使用的是 publishOn(Schedulers.parallel())
,但是这个创建了新的线程池(并行)。相反,我更喜欢切换 reactor-http-nio 线程池。
实际结果日志:
19:17:45.290 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() - id: 1
19:17:45.291 [reactor-http-nio-2 ] DEBUG t.a.p.service.BookService - getBook() - id: 1
19:17:45.316 [blocking-pool-1 ] TRACE t.a.p.service.BookService - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [parallel-2 ] DEBUG t.a.p.controller.BookController - get() result: Book(id=1, title=Abc)
预期结果日志:
19:17:45.290 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() - id: 1
19:17:45.291 [reactor-http-nio-2 ] DEBUG t.a.p.service.BookService - getBook() - id: 1
19:17:45.316 [blocking-pool-1 ] TRACE t.a.p.service.BookService - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() result: Book(id=1, title=Abc)
目前这是不可能的,因为 A) 这些 HTTP 线程不是由 Reactor Scheduler
控制的,而是由底层的 Netty 事件循环本身控制的,并且 B) [=21= 中没有通用的方法] 到“return 执行一个(任意)线程”,如果该线程没有与之关联的 Executor
/ExecutorService
。
对于 reactor-netty,一旦您退出了 HTTP 线程,就没有理由再切换回 Netty 线程了。一旦发送响应,reactor-netty 将自然完成。
假设阻塞池类似于 Schedulers.boundedElastic()
,您可能确实想转到 Schedulers.parallel()
来限制阻塞线程的寿命,这是一个完美的解决方案。