如何使用 netty-reactor 从阻塞调度程序切换回以前的调度程序?

How to switch back from blocking scheduler to previous scheduler using netty-reactor?

如何使用 Spring Webflux + Netty + Reactor 从阻塞调度程序 (blocking-pool) 切换回之前的调度程序 (reactor-http-nio)?

代码:

@RequiredArgsConstructor
@Service
@Slf4j
public class BookService {

    private final IBookRepo bookRepo;

    private final BlockingPoolConfig blockingPoolConfig;

    public Mono<Optional<Book>> getBook(Long id) {
        log.debug("getBook() - id: {}", id);
        return asyncCallable(() -> {
            log.trace("getBook() - invoking bookRepo.findById(id) ...");
            return bookRepo.findById(id);
        });
    }

    protected <S> Mono<S> asyncCallable(Callable<S> callable) {
        return Mono.fromCallable(callable)
                .subscribeOn(blockingPoolConfig.blockingScheduler()); 
    }
}

@RestController
@RequiredArgsConstructor
@Slf4j
public class BookController {

    private final BookService bookService;

    @GetMapping("/book/{id}")
    public Mono<Book> get(@PathVariable Long id) {
        log.debug("get() - id: {}", id);
        return bookService.getBook(id)
                .publishOn(Schedulers.parallel())  //publishOn(... ?)
                .map(optionalBook -> {
                    return optionalBook.map(book -> {
                        log.debug("get() result: {}", book);
                        return book;
                    }).orElseThrow(() -> {
                        log.debug("book with id: {} is not found.", id);
                        return new ResponseStatusException(HttpStatus.NOT_FOUND, "Book not found");
                    });
                });
    }

@Configuration
@Slf4j
public class BlockingPoolConfig {

    @Value("${spring.datasource.maximumPoolSize:8}")
    private int connectionPoolSize = 1;

    @Scope("singleton")
    @Bean
    public Scheduler blockingScheduler() {
        Scheduler scheduler = Schedulers.newBoundedElastic(connectionPoolSize, connectionPoolSize, "blocking-pool");
        return scheduler;
    }
}

上面我使用的是 publishOn(Schedulers.parallel()),但是这个创建了新的线程池(并行)。相反,我更喜欢切换 reactor-http-nio 线程池。

实际结果日志:

19:17:45.290 [reactor-http-nio-2       ] DEBUG t.a.p.controller.BookController    - get() - id: 1
19:17:45.291 [reactor-http-nio-2       ] DEBUG t.a.p.service.BookService          - getBook() - id: 1
19:17:45.316 [blocking-pool-1          ] TRACE t.a.p.service.BookService          - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [parallel-2               ] DEBUG t.a.p.controller.BookController    - get() result: Book(id=1, title=Abc)

预期结果日志:

19:17:45.290 [reactor-http-nio-2       ] DEBUG t.a.p.controller.BookController    - get() - id: 1
19:17:45.291 [reactor-http-nio-2       ] DEBUG t.a.p.service.BookService          - getBook() - id: 1
19:17:45.316 [blocking-pool-1          ] TRACE t.a.p.service.BookService          - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [reactor-http-nio-2       ] DEBUG t.a.p.controller.BookController    - get() result: Book(id=1, title=Abc)

目前这是不可能的,因为 A) 这些 HTTP 线程不是由 Reactor Scheduler 控制的,而是由底层的 Netty 事件循环本身控制的,并且 B) [=21= 中没有通用的方法] 到“return 执行一个(任意)线程”,如果该线程没有与之关联的 Executor/ExecutorService

对于 reactor-netty,一旦您退出了 HTTP 线程,就没有理由再切换回 Netty 线程了。一旦发送响应,reactor-netty 将自然完成。

假设阻塞池类似于 Schedulers.boundedElastic(),您可能确实想转到 Schedulers.parallel() 来限制阻塞线程的寿命,这是一个完美的解决方案。