打印日志时,项目反应器始终在主线程中运行
project reactor always runs in the main thread when the logs get prints
我是 Project Reactor 的新手,在研究 Project Reactor 时,我观察到一件事,总是以打印项目主管日志为主要内容。如果 reactor 运行s 不关心线程,那是怎么发生的?以及我如何在不同的不同线程上获得经过验证的反应代码 运行?
作为默认 Reactor 运行 基于您订阅的线程。如果您从自己的线程订阅,反应堆 运行 将基于该线程。也就是说不是每次都是主线程。
你可以 运行 这个测试代码来验证。这是在主线程上创建的通量。那个流量是从另一个新创建的线程订阅的。在 运行 应用程序之后,查看日志。日志会证明这一点。日志已 运行 在我们新创建的新线程上。
public void testTheThread() throws InterruptedException {
//the flux created on the main thread.
Flux<String> stringFlux = Flux
.fromArray(new String[]{"a", "b"})
.map(String::toUpperCase)
.log();
//the subscriber runs on another new thread. [my-new-thread]
Thread newThread = new Thread(() -> {
stringFlux.subscribe(s -> {
System.out.println("String is = " + s);
});
});
newThread.setName("my-new-thread");
newThread.start();
//sleep the main thread until get the data from my-new-thread
//otherwise the log will not be printed.
Thread.sleep(1000);
}
日志将是这样的。
17:04:30.867 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
17:04:30.874 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | request(unbounded)
17:04:30.874 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | onNext(A)
s = A
17:04:30.880 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | onNext(B)
s = B
17:04:30.881 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | onComplete()
Process finished with exit code 0
我是 Project Reactor 的新手,在研究 Project Reactor 时,我观察到一件事,总是以打印项目主管日志为主要内容。如果 reactor 运行s 不关心线程,那是怎么发生的?以及我如何在不同的不同线程上获得经过验证的反应代码 运行?
作为默认 Reactor 运行 基于您订阅的线程。如果您从自己的线程订阅,反应堆 运行 将基于该线程。也就是说不是每次都是主线程。 你可以 运行 这个测试代码来验证。这是在主线程上创建的通量。那个流量是从另一个新创建的线程订阅的。在 运行 应用程序之后,查看日志。日志会证明这一点。日志已 运行 在我们新创建的新线程上。
public void testTheThread() throws InterruptedException {
//the flux created on the main thread.
Flux<String> stringFlux = Flux
.fromArray(new String[]{"a", "b"})
.map(String::toUpperCase)
.log();
//the subscriber runs on another new thread. [my-new-thread]
Thread newThread = new Thread(() -> {
stringFlux.subscribe(s -> {
System.out.println("String is = " + s);
});
});
newThread.setName("my-new-thread");
newThread.start();
//sleep the main thread until get the data from my-new-thread
//otherwise the log will not be printed.
Thread.sleep(1000);
}
日志将是这样的。
17:04:30.867 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
17:04:30.874 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | request(unbounded)
17:04:30.874 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | onNext(A)
s = A
17:04:30.880 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | onNext(B)
s = B
17:04:30.881 [my-new-thread] INFO reactor.Flux.MapFuseable.1 - | onComplete()
Process finished with exit code 0