反应式编程(Reactor):为什么主线程卡住了?

Reactive programming (Reactor) : Why main thread is stuck?

我正在使用 project-reactor 学习响应式编程。

我有以下测试用例:

@Test
public void createAFlux_just() {
    Flux<String> fruitFlux = Flux.just("apple", "orange");
    fruitFlux.subscribe(f -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(f);
    });
    System.out.println("hello main thread");
}

执行测试发现主线程卡住了5秒

我希望订阅的消费者应该 运行 在自己的线程中异步调用,也就是说,订阅调用应该 return 立即到主线程,因此 hello main thread 应该立即打印。

如果您有一个异步的可观察对象 (Flux),就会出现这种情况。您选择通过使用 just 方法使用具有两个现成可用值的 Flux。由于它们立即可用,因此立即传递给订阅对象。

主线程卡住了,因为订阅发生在 main 线程上。如果您希望它异步地 运行,则需要在 main 以外的线程上进行订阅。您可以这样做:

 @Test
public void createAFlux_just() {
    Flux<String> fruitFlux = Flux.just("apple", "orange");
    fruitFlux.subscribeOn(Schedulers.parallel()).subscribe(f -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(f);
    });
    System.out.println("hello main thread");
}

注意:我使用了parallel线程池。你可以使用任何你喜欢的游泳池。 Reactor 的管道默认在调用线程上执行(不像 CompletableFuture<T> 默认在 ForkJoin 池中的 运行s)。

来自 spring.io documentation

The Threading Model Reactor operators generally are concurrent agnostic: they don’t impose a particular threading model and just run on the Thread on which their onNext method was invoked.

The Scheduler abstraction In Reactor, a Scheduler is an abstraction that gives the user control about threading. A Scheduler can spawn Worker which are conceptually Threads, but are not necessarily backed by a Thread (we’ll see an example of that later). A Scheduler also includes the notion of a clock, whereas the Worker is purely about scheduling tasks.

所以您应该通过 subscribeOn 方法订阅不同的线程,并且 Thread.sleep(5000) 将使调度程序的线程休眠。您可以在文档中看到更多类似的示例。

Flux.just("hello")
    .doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
    .publishOn(Scheduler.boundedElastic())
    .doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
    .delayElements(Duration.ofMillis(500))
    .subscribeOn(Schedulers.elastic())
    .subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));