PublishSubject `subscribeOn` 行为
PublishSubject `subscribeOn` behavior
为什么 subscribe
从不在此处打印任何内容?只是出于好奇。这无论如何都是不好的做法:我通常会使用 observeOn
代替。但是,我不明白为什么从未达到 subscribe
...
val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)
subject
.map { it + 1 }
.subscribeOn(Schedulers.computation())
.subscribe {
println(Thread.currentThread().name)
countDownLatch.countDown()
}
subject.onNext(1)
countDownLatch.await()
为什么会这样
在 订阅 的过程中,观察者通过 Subscribe
通知向可观察对象发出其准备好接收项目的信号。有关详细信息,请参阅 Observable
contract。
此外,Subject
文档指出:
Note that a PublishSubject
may begin emitting items immediately upon creation (unless you have taken steps to prevent this), and so there is a risk that one or more items may be lost between the time the Subject
is created and the observer subscribes to it.
当您尝试通过 .subscribeOn(Schedulers.computation())
订阅新线程后立即调用 subject.onNext(_)
时,可观察对象(即 subject
)可能仍在等待来自观察者的 Subscribe
通知。例如:
subject
.subscribeOn(Schedulers.computation())
.subscribe { println("received item") }
// this usually prints nothing!
subject.onNext(1)
但是,如果您在发出第一个项目之前添加一点时间延迟,则可观察对象更有可能在您调用 subject.onNext(_)
之前收到来自观察者的 Subscribe
通知。例如:
subject
.subscribeOn(Schedulers.computation())
.subscribe { println("received item") }
// wait for subscription to be established properly
Thread.sleep(1000)
// this usually prints "received item"
subject.onNext(1)
怎么办?
如果您希望您的所有订阅都接收到一个 observable 发出的所有项目,您可以执行以下操作之一:
- 在调用
subject.onNext(_)
. 之前阻塞主线程等待所有观察者被订阅
- 创建一个新的可观察对象,等待所有可观察对象都被订阅,然后再在其内部调用
subject.onNext(_)
。
这些也可能有用:
ReplaySubject
:这允许您存储所有以前项目的历史记录,并且 re-emit 它们在每个订阅中。缺点:您需要在内存中存储任意数量的项目。
ConnectableObservable
:这确保了 observable 仅在调用 .connect()
后才发出项目。特别是,.autoConnect(n)
运算符确保可观察对象仅在 n
个观察者成功订阅后才发出。
示例:阻塞主线程直到订阅
val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)
val isSubscribedLatch = CountDownLatch(1)
subject
.subscribeOn(Schedulers.computation())
.doOnSubscribe { isSubscribedLatch.countDown() }
.map { it + 1 }
.subscribe {
countDownLatch.countDown()
println(Thread.currentThread().name)
}
isSubscribedLatch.await()
subject.onNext(1)
countDownLatch.await()
为什么 subscribe
从不在此处打印任何内容?只是出于好奇。这无论如何都是不好的做法:我通常会使用 observeOn
代替。但是,我不明白为什么从未达到 subscribe
...
val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)
subject
.map { it + 1 }
.subscribeOn(Schedulers.computation())
.subscribe {
println(Thread.currentThread().name)
countDownLatch.countDown()
}
subject.onNext(1)
countDownLatch.await()
为什么会这样
在 订阅 的过程中,观察者通过 Subscribe
通知向可观察对象发出其准备好接收项目的信号。有关详细信息,请参阅 Observable
contract。
此外,Subject
文档指出:
Note that a
PublishSubject
may begin emitting items immediately upon creation (unless you have taken steps to prevent this), and so there is a risk that one or more items may be lost between the time theSubject
is created and the observer subscribes to it.
当您尝试通过 .subscribeOn(Schedulers.computation())
订阅新线程后立即调用 subject.onNext(_)
时,可观察对象(即 subject
)可能仍在等待来自观察者的 Subscribe
通知。例如:
subject
.subscribeOn(Schedulers.computation())
.subscribe { println("received item") }
// this usually prints nothing!
subject.onNext(1)
但是,如果您在发出第一个项目之前添加一点时间延迟,则可观察对象更有可能在您调用 subject.onNext(_)
之前收到来自观察者的 Subscribe
通知。例如:
subject
.subscribeOn(Schedulers.computation())
.subscribe { println("received item") }
// wait for subscription to be established properly
Thread.sleep(1000)
// this usually prints "received item"
subject.onNext(1)
怎么办?
如果您希望您的所有订阅都接收到一个 observable 发出的所有项目,您可以执行以下操作之一:
- 在调用
subject.onNext(_)
. 之前阻塞主线程等待所有观察者被订阅
- 创建一个新的可观察对象,等待所有可观察对象都被订阅,然后再在其内部调用
subject.onNext(_)
。
这些也可能有用:
ReplaySubject
:这允许您存储所有以前项目的历史记录,并且 re-emit 它们在每个订阅中。缺点:您需要在内存中存储任意数量的项目。ConnectableObservable
:这确保了 observable 仅在调用.connect()
后才发出项目。特别是,.autoConnect(n)
运算符确保可观察对象仅在n
个观察者成功订阅后才发出。
示例:阻塞主线程直到订阅
val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)
val isSubscribedLatch = CountDownLatch(1)
subject
.subscribeOn(Schedulers.computation())
.doOnSubscribe { isSubscribedLatch.countDown() }
.map { it + 1 }
.subscribe {
countDownLatch.countDown()
println(Thread.currentThread().name)
}
isSubscribedLatch.await()
subject.onNext(1)
countDownLatch.await()