为什么 subscribeOn 对 Rxjava 中的 PublishSubject 没有影响?
Why doesn't subscribeOn effect on PublishSubject in Rxjava?
这是我在 Kotlin 中的测试代码:
fun main() {
rxjava()
}
fun rxjava() {
val queuSubject = PublishSubject.create<String>()
queuSubject
.map { t ->
val a = t.toLong()
Thread.sleep(6000 / a)
println("map $a called ${Thread.currentThread().name} ")
a
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
println("thread in subscription ${Thread.currentThread().name}")
}, {
println("error ${it.message}")
})
for (i in 1..3) {
Thread {
queuSubject.onNext("$i")
}.start()
}
Thread.sleep(15000)
}
我试图在不同的 IO 线程中 运行 map
阻塞和 subscribe's onNext
阻塞。但是输出是这样的:
map 3 called Thread-2
thread in subscription RxCachedThreadScheduler-2
map 2 called Thread-1
thread in subscription RxCachedThreadScheduler-2
map 1 called Thread-0
thread in subscription RxCachedThreadScheduler-2
如您所见,调用subscribeOn
似乎对PublishSubject's
流没有影响,thread-0,thread-1 and thread-2
指的是调用onNext
方法的线程。
另外考虑下面的代码:
fun main() {
rxjava()
}
fun rxjava() {
val queuSubject = PublishSubject.create<String>()
queuSubject
.map { t ->
val a = t.toLong()
Thread.sleep(6000 / a)
println("map $a called ${Thread.currentThread().name} ")
a
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
println("thread in subscription ${Thread.currentThread().name}")
}, {
println("error ${it.message}")
})
queuSubject.onNext("1")
queuSubject.onNext("2")
queuSubject.onNext("3")
Thread.sleep(15000)
}
我写了上面的代码,看到没有打印输出。但是,如果我从流中删除 subscribeOn
,消息将按如下顺序打印:
map 1 called main
thread in subscription RxCachedThreadScheduler-1
map 2 called main
thread in subscription RxCachedThreadScheduler-1
map 3 called main
thread in subscription RxCachedThreadScheduler-1
这些代码有什么问题?谢谢。
因为 subscribeOn
只影响源的订阅副作用。如果源在观察者订阅时立即开始发出事件,就会出现这种副作用:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.doOnNext(v -> System.out.println(Thread.currentThread() + " - " + v)
.blockingSubscribe();
PublishSubject
没有订阅副作用,因为它仅将信号从其 onXXX
方法中继到观察者的 onXXX
方法。
但是,subscribeOn
具有时间效应,因为它延迟了对源的实际订阅,因此在 PublishSubject
的情况下,它可能无法及时看到已注册的观察者,其他线程调用它的观察者onXXX
方法。
如果要将处理从原始线程中移出,请使用 observeOn
:
val queuSubject = PublishSubject.create<String>()
queuSubject
.observeOn(Schedulers.io()) // <----------------------------------------
.map { t ->
val a = t.toLong()
Thread.sleep(6000 / a)
println("map $a called ${Thread.currentThread().name} ")
a
}
.observeOn(Schedulers.io())
.subscribe({
println("thread in subscription ${Thread.currentThread().name}")
}, {
println("error ${it.message}")
})
这是我在 Kotlin 中的测试代码:
fun main() {
rxjava()
}
fun rxjava() {
val queuSubject = PublishSubject.create<String>()
queuSubject
.map { t ->
val a = t.toLong()
Thread.sleep(6000 / a)
println("map $a called ${Thread.currentThread().name} ")
a
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
println("thread in subscription ${Thread.currentThread().name}")
}, {
println("error ${it.message}")
})
for (i in 1..3) {
Thread {
queuSubject.onNext("$i")
}.start()
}
Thread.sleep(15000)
}
我试图在不同的 IO 线程中 运行 map
阻塞和 subscribe's onNext
阻塞。但是输出是这样的:
map 3 called Thread-2
thread in subscription RxCachedThreadScheduler-2
map 2 called Thread-1
thread in subscription RxCachedThreadScheduler-2
map 1 called Thread-0
thread in subscription RxCachedThreadScheduler-2
如您所见,调用subscribeOn
似乎对PublishSubject's
流没有影响,thread-0,thread-1 and thread-2
指的是调用onNext
方法的线程。
另外考虑下面的代码:
fun main() {
rxjava()
}
fun rxjava() {
val queuSubject = PublishSubject.create<String>()
queuSubject
.map { t ->
val a = t.toLong()
Thread.sleep(6000 / a)
println("map $a called ${Thread.currentThread().name} ")
a
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe({
println("thread in subscription ${Thread.currentThread().name}")
}, {
println("error ${it.message}")
})
queuSubject.onNext("1")
queuSubject.onNext("2")
queuSubject.onNext("3")
Thread.sleep(15000)
}
我写了上面的代码,看到没有打印输出。但是,如果我从流中删除 subscribeOn
,消息将按如下顺序打印:
map 1 called main
thread in subscription RxCachedThreadScheduler-1
map 2 called main
thread in subscription RxCachedThreadScheduler-1
map 3 called main
thread in subscription RxCachedThreadScheduler-1
这些代码有什么问题?谢谢。
因为 subscribeOn
只影响源的订阅副作用。如果源在观察者订阅时立即开始发出事件,就会出现这种副作用:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.doOnNext(v -> System.out.println(Thread.currentThread() + " - " + v)
.blockingSubscribe();
PublishSubject
没有订阅副作用,因为它仅将信号从其 onXXX
方法中继到观察者的 onXXX
方法。
但是,subscribeOn
具有时间效应,因为它延迟了对源的实际订阅,因此在 PublishSubject
的情况下,它可能无法及时看到已注册的观察者,其他线程调用它的观察者onXXX
方法。
如果要将处理从原始线程中移出,请使用 observeOn
:
val queuSubject = PublishSubject.create<String>()
queuSubject
.observeOn(Schedulers.io()) // <----------------------------------------
.map { t ->
val a = t.toLong()
Thread.sleep(6000 / a)
println("map $a called ${Thread.currentThread().name} ")
a
}
.observeOn(Schedulers.io())
.subscribe({
println("thread in subscription ${Thread.currentThread().name}")
}, {
println("error ${it.message}")
})