每次共享流获得新订阅者时,如何让共享可观察对象从 .startWith() 发出新值?
How to get a shared observable to emit a new value from .startWith() every time a shared stream gets a new subscriber?
我需要一个共享的可观察对象,以便在每次订阅共享流时发出一个新的 .startWith()
值(订阅者计数从 0 变为 1)。不幸的是,.startWith()
返回的值在共享可观察实例的整个生命周期中被重用,即使在这个共享流没有订阅者之后也是如此。
理想情况下,会有一个 .startWith()
重载,它将一个函数作为参数,并在每次订阅者数量从 0 变为 1 时重新执行它。
var count: Int = 0
@Before
fun setUp() {
count = 0 //reset
}
fun getTheCount(): Int {
count++
return count
}
@Test
fun startWithDefaultValue() {
val relay = PublishRelay.create<Int>()
val instance by lazy {
relay
.startWith(getTheCount())
.share()
}
val disposable1 = instance.subscribe {
println(it) //should print 1, and does!
}
disposable1.dispose() //subscriber count on shared instance drops from 1 to 0
//should print 2, but prints 1. getTheCount() is not called again on this subscription
val disposable2 = instance.subscribe {
println(it)
}
}
我猜 val instance by lazy
意味着初始化一次,因此您只调用了一次 getTheCount()
方法。 (我也希望你明白 Observable.just(getTheCount())
将向每个观察者发出相同的值,并且再也不会 "call" getTheCount()
。)
你应该推迟惰性内部:
Observable.defer(() -> relay.startWith(getTheCount()))
.share();
或使用fromCallable
和concatWith:
relay.concatWith(Observable.fromCallable(() -> getTheCount()))
.share();
我需要一个共享的可观察对象,以便在每次订阅共享流时发出一个新的 .startWith()
值(订阅者计数从 0 变为 1)。不幸的是,.startWith()
返回的值在共享可观察实例的整个生命周期中被重用,即使在这个共享流没有订阅者之后也是如此。
理想情况下,会有一个 .startWith()
重载,它将一个函数作为参数,并在每次订阅者数量从 0 变为 1 时重新执行它。
var count: Int = 0
@Before
fun setUp() {
count = 0 //reset
}
fun getTheCount(): Int {
count++
return count
}
@Test
fun startWithDefaultValue() {
val relay = PublishRelay.create<Int>()
val instance by lazy {
relay
.startWith(getTheCount())
.share()
}
val disposable1 = instance.subscribe {
println(it) //should print 1, and does!
}
disposable1.dispose() //subscriber count on shared instance drops from 1 to 0
//should print 2, but prints 1. getTheCount() is not called again on this subscription
val disposable2 = instance.subscribe {
println(it)
}
}
我猜 val instance by lazy
意味着初始化一次,因此您只调用了一次 getTheCount()
方法。 (我也希望你明白 Observable.just(getTheCount())
将向每个观察者发出相同的值,并且再也不会 "call" getTheCount()
。)
你应该推迟惰性内部:
Observable.defer(() -> relay.startWith(getTheCount()))
.share();
或使用fromCallable
和concatWith:
relay.concatWith(Observable.fromCallable(() -> getTheCount()))
.share();