SubscribeOn 或 PublishOn 不适用于 ReactiveCassandraCrudRepository
SubscribeOn or PublishOn not working with ReactiveCassandraCrudRepository
我们正在使用 Reactive Spring Data Repository 和 Spring WebFlux,我对 SubscribeOn 的理解是它决定了 SubscribeOn 之前的操作员将在流量中执行哪个 ThreadPool,而 PublishOn 决定实际订阅将在其上执行的线程池。然而,在下面的代码中,即使使用 PublishOn 和 SubscribeOn,代码也不会在主线程上执行,而是回退到 Cluster-nio-worker-1。
System.out.println("Current Thread :- "+Thread.currentThread().getName()); //Current Thread :- main
personRepository.findAll().log()
.map(document -> mapDocumentToSomethingElse(document)) //Current thread cluster-nio-worker-1
.subscribeOn(Schedulers.immediate())
.publishOn(Schedulers.immediate())
.subscribe(trackingevent -> System.out.println("Got Item "+item +" inside thread "+Thread.currentThread()), //Thread[cluster-nio-worker-1,5,main]
excp -> excp.printStackTrace(),
() -> System.out.println("Completed processing Thread:- "+Thread.currentThread().getName())); //cluster-nio-worker-1
还有 Thread[cluster-nio-worker-1,5,main] 是什么意思?为什么这些方法调用不使用主线程执行。
subscribeOn 方法使发布者使用给定的线程池来发布值。管道中可能有 N 个 subscribeOn
方法。最近的一个将生效。 personRepository.findAll().log()
是包装器,returns 是助焊剂。因此,如果它在内部使用任何调度程序,则您不能使用 subscribeOn 更改它。例如,interval
方法使用并行,我无法将其更改为 boundedElastic,如此处所示。
Flux.interval(Duration.ofSeconds(1))
.subscribeOn(Schedulers.boundedElastic())
Schedulers.immediate
只是让流水线在同一个线程中执行。它不是主要的,在你的情况下它将是 cluster-nio-worker
线程池。
我们可以从主线程池切换到任何调度程序线程池。但是我们不能将执行切换回主线程。这不是项目反应器的限制。应该是Java本身的限制。
我们正在使用 Reactive Spring Data Repository 和 Spring WebFlux,我对 SubscribeOn 的理解是它决定了 SubscribeOn 之前的操作员将在流量中执行哪个 ThreadPool,而 PublishOn 决定实际订阅将在其上执行的线程池。然而,在下面的代码中,即使使用 PublishOn 和 SubscribeOn,代码也不会在主线程上执行,而是回退到 Cluster-nio-worker-1。
System.out.println("Current Thread :- "+Thread.currentThread().getName()); //Current Thread :- main
personRepository.findAll().log()
.map(document -> mapDocumentToSomethingElse(document)) //Current thread cluster-nio-worker-1
.subscribeOn(Schedulers.immediate())
.publishOn(Schedulers.immediate())
.subscribe(trackingevent -> System.out.println("Got Item "+item +" inside thread "+Thread.currentThread()), //Thread[cluster-nio-worker-1,5,main]
excp -> excp.printStackTrace(),
() -> System.out.println("Completed processing Thread:- "+Thread.currentThread().getName())); //cluster-nio-worker-1
还有 Thread[cluster-nio-worker-1,5,main] 是什么意思?为什么这些方法调用不使用主线程执行。
subscribeOn 方法使发布者使用给定的线程池来发布值。管道中可能有 N 个 subscribeOn
方法。最近的一个将生效。 personRepository.findAll().log()
是包装器,returns 是助焊剂。因此,如果它在内部使用任何调度程序,则您不能使用 subscribeOn 更改它。例如,interval
方法使用并行,我无法将其更改为 boundedElastic,如此处所示。
Flux.interval(Duration.ofSeconds(1))
.subscribeOn(Schedulers.boundedElastic())
Schedulers.immediate
只是让流水线在同一个线程中执行。它不是主要的,在你的情况下它将是 cluster-nio-worker
线程池。
我们可以从主线程池切换到任何调度程序线程池。但是我们不能将执行切换回主线程。这不是项目反应器的限制。应该是Java本身的限制。