反应式编程(Reactor):为什么主线程卡住了?
Reactive programming (Reactor) : Why main thread is stuck?
我正在使用 project-reactor 学习响应式编程。
我有以下测试用例:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
执行测试发现主线程卡住了5秒
我希望订阅的消费者应该 运行 在自己的线程中异步调用,也就是说,订阅调用应该 return 立即到主线程,因此 hello main thread
应该立即打印。
如果您有一个异步的可观察对象 (Flux),就会出现这种情况。您选择通过使用 just 方法使用具有两个现成可用值的 Flux。由于它们立即可用,因此立即传递给订阅对象。
主线程卡住了,因为订阅发生在 main
线程上。如果您希望它异步地 运行,则需要在 main
以外的线程上进行订阅。您可以这样做:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribeOn(Schedulers.parallel()).subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
注意:我使用了parallel
线程池。你可以使用任何你喜欢的游泳池。 Reactor 的管道默认在调用线程上执行(不像 CompletableFuture<T>
默认在 ForkJoin
池中的 运行s)。
来自 spring.io documentation
The Threading Model
Reactor operators generally are concurrent agnostic: they don’t impose a particular threading model and just run on the Thread on which their onNext method was invoked.
The Scheduler abstraction
In Reactor, a Scheduler is an abstraction that gives the user control about threading. A Scheduler can spawn Worker which are conceptually Threads, but are not necessarily backed by a Thread (we’ll see an example of that later). A Scheduler also includes the notion of a clock, whereas the Worker is purely about scheduling tasks.
所以您应该通过 subscribeOn
方法订阅不同的线程,并且 Thread.sleep(5000)
将使调度程序的线程休眠。您可以在文档中看到更多类似的示例。
Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));
我正在使用 project-reactor 学习响应式编程。
我有以下测试用例:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
执行测试发现主线程卡住了5秒
我希望订阅的消费者应该 运行 在自己的线程中异步调用,也就是说,订阅调用应该 return 立即到主线程,因此 hello main thread
应该立即打印。
如果您有一个异步的可观察对象 (Flux),就会出现这种情况。您选择通过使用 just 方法使用具有两个现成可用值的 Flux。由于它们立即可用,因此立即传递给订阅对象。
主线程卡住了,因为订阅发生在 main
线程上。如果您希望它异步地 运行,则需要在 main
以外的线程上进行订阅。您可以这样做:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribeOn(Schedulers.parallel()).subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
注意:我使用了parallel
线程池。你可以使用任何你喜欢的游泳池。 Reactor 的管道默认在调用线程上执行(不像 CompletableFuture<T>
默认在 ForkJoin
池中的 运行s)。
来自 spring.io documentation
The Threading Model Reactor operators generally are concurrent agnostic: they don’t impose a particular threading model and just run on the Thread on which their onNext method was invoked.
The Scheduler abstraction In Reactor, a Scheduler is an abstraction that gives the user control about threading. A Scheduler can spawn Worker which are conceptually Threads, but are not necessarily backed by a Thread (we’ll see an example of that later). A Scheduler also includes the notion of a clock, whereas the Worker is purely about scheduling tasks.
所以您应该通过 subscribeOn
方法订阅不同的线程,并且 Thread.sleep(5000)
将使调度程序的线程休眠。您可以在文档中看到更多类似的示例。
Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));