在 flux 上同时使用 publishOn 和 subscribeOn 不会导致任何事情发生
Using both publishOn and subscribeOn on a flux results in nothing happening
每当我同时使用 subscribeOn 和 publishOn 时,都不会打印任何内容。
如果我只使用一个它会打印出来。
如果我使用 subscribeOn(Schedulers.immediate()) 或 elastic 它会起作用。
知道这是为什么吗?
据我了解,publishOn 会影响发布的线程和订阅者运行的线程。你能给我指出正确的方向吗?
fun test() {
val testPublisher = EmitterProcessor.create<String>().connect()
testPublisher
.publishOn(Schedulers.elastic())
.map { it ->
println("map on ${Thread.currentThread().name}")
it
}
.subscribeOn(Schedulers.parallel())
.subscribe { println("subscribe on ${Thread.currentThread().name}") }
testPublisher.onNext("a")
testPublisher.onNext("b")
testPublisher.onNext("c")
Thread.sleep(5000)
println("---")
}
subscribeOn
而影响 订阅 发生的位置。即触发源发出元素的初始事件。另一方面,Subscriber
的 onNext
钩子受到链中最近的 publishOn
的影响(很像你的 map
)。
但是 EmitterProcessor
和大多数 Processor
s 一样,更高级,可以做一些工作窃取。我不确定为什么你没有在你的案例中打印任何东西(你的样本转换为 Java 在我的机器上工作),但我敢打赌它与处理器有关。
这段代码可以更好地展示 subscribeOn
与 publishOn
:
Flux.just("a", "b", "c") //this is where subscription triggers data production
//this is influenced by subscribeOn
.doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName()))
.publishOn(Schedulers.elastic())
//the rest is influenced by publishOn
.doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.parallel())
.subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName()));
Thread.sleep(5000);
打印出来:
before publishOn: parallel-1
before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2
每当我同时使用 subscribeOn 和 publishOn 时,都不会打印任何内容。 如果我只使用一个它会打印出来。 如果我使用 subscribeOn(Schedulers.immediate()) 或 elastic 它会起作用。 知道这是为什么吗?
据我了解,publishOn 会影响发布的线程和订阅者运行的线程。你能给我指出正确的方向吗?
fun test() {
val testPublisher = EmitterProcessor.create<String>().connect()
testPublisher
.publishOn(Schedulers.elastic())
.map { it ->
println("map on ${Thread.currentThread().name}")
it
}
.subscribeOn(Schedulers.parallel())
.subscribe { println("subscribe on ${Thread.currentThread().name}") }
testPublisher.onNext("a")
testPublisher.onNext("b")
testPublisher.onNext("c")
Thread.sleep(5000)
println("---")
}
subscribeOn
而影响 订阅 发生的位置。即触发源发出元素的初始事件。另一方面,Subscriber
的 onNext
钩子受到链中最近的 publishOn
的影响(很像你的 map
)。
但是 EmitterProcessor
和大多数 Processor
s 一样,更高级,可以做一些工作窃取。我不确定为什么你没有在你的案例中打印任何东西(你的样本转换为 Java 在我的机器上工作),但我敢打赌它与处理器有关。
这段代码可以更好地展示 subscribeOn
与 publishOn
:
Flux.just("a", "b", "c") //this is where subscription triggers data production
//this is influenced by subscribeOn
.doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName()))
.publishOn(Schedulers.elastic())
//the rest is influenced by publishOn
.doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.parallel())
.subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName()));
Thread.sleep(5000);
打印出来:
before publishOn: parallel-1
before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2