每次共享流获得新订阅者时,如何让共享可观察对象从 .startWith() 发出新值?

How to get a shared observable to emit a new value from .startWith() every time a shared stream gets a new subscriber?

我需要一个共享的可观察对象,以便在每次订阅共享流时发出一个新的 .startWith() 值(订阅者计数从 0 变为 1)。不幸的是,.startWith() 返回的值在共享可观察实例的整个生命周期中被重用,即使在这个共享流没有订阅者之后也是如此。

理想情况下,会有一个 .startWith() 重载,它将一个函数作为参数,并在每次订阅者数量从 0 变为 1 时重新执行它。

var count: Int = 0

@Before
fun setUp() {
    count = 0 //reset
}

fun getTheCount(): Int {
    count++
    return count
}

@Test
fun startWithDefaultValue() {
    val relay = PublishRelay.create<Int>()

    val instance by lazy {
        relay
                .startWith(getTheCount())
                .share()
    }

    val disposable1 = instance.subscribe {
        println(it) //should print 1, and does!
    }

    disposable1.dispose() //subscriber count on shared instance drops from 1 to 0

    //should print 2, but prints 1. getTheCount() is not called again on this subscription
    val disposable2 = instance.subscribe {
        println(it) 
    }
}

我猜 val instance by lazy 意味着初始化一次,因此您只调用了一次 getTheCount() 方法。 (我也希望你明白 Observable.just(getTheCount()) 将向每个观察者发出相同的值,并且再也不会 "call" getTheCount()。)

你应该推迟惰性内部:

Observable.defer(() -> relay.startWith(getTheCount()))
.share();

或使用fromCallable和concatWith:

relay.concatWith(Observable.fromCallable(() -> getTheCount()))
.share();