SubscribeOn 或 PublishOn 不适用于 ReactiveCassandraCrudRepository

SubscribeOn or PublishOn not working with ReactiveCassandraCrudRepository

我们正在使用 Reactive Spring Data Repository 和 Spring WebFlux,我对 SubscribeOn 的理解是它决定了 SubscribeOn 之前的操作员将在流量中执行哪个 ThreadPool,而 PublishOn 决定实际订阅将在其上执行的线程池。然而,在下面的代码中,即使使用 PublishOn 和 SubscribeOn,代码也不会在主线程上执行,而是回退到 Cluster-nio-worker-1。

    System.out.println("Current Thread :- "+Thread.currentThread().getName()); //Current Thread :- main
    personRepository.findAll().log()
            .map(document -> mapDocumentToSomethingElse(document)) //Current thread cluster-nio-worker-1
            .subscribeOn(Schedulers.immediate())
            .publishOn(Schedulers.immediate())
            .subscribe(trackingevent -> System.out.println("Got Item "+item +" inside thread "+Thread.currentThread()), //Thread[cluster-nio-worker-1,5,main]
                    excp -> excp.printStackTrace(),
                    () -> System.out.println("Completed processing Thread:- "+Thread.currentThread().getName())); //cluster-nio-worker-1

还有 Thread[cluster-nio-worker-1,5,main] 是什么意思?为什么这些方法调用不使用主线程执行。

subscribeOn 方法使发布者使用给定的线程池来发布值。管道中可能有 N 个 subscribeOn 方法。最近的一个将生效。 personRepository.findAll().log() 是包装器,returns 是助焊剂。因此,如果它在内部使用任何调度程序,则您不能使用 subscribeOn 更改它。例如,interval 方法使用并行,我无法将其更改为 boundedElastic,如此处所示。

    Flux.interval(Duration.ofSeconds(1))
            .subscribeOn(Schedulers.boundedElastic())

Schedulers.immediate 只是让流水线在同一个线程中执行。它不是主要的,在你的情况下它将是 cluster-nio-worker 线程池。

我们可以从主线程池切换到任何调度程序线程池。但是我们不能将执行切换回主线程。这不是项目反应器的限制。应该是Java本身的限制。