PublishSubject `subscribeOn` 行为

PublishSubject `subscribeOn` behavior

为什么 subscribe 从不在此处打印任何内容?只是出于好奇。这无论如何都是不好的做法:我通常会使用 observeOn 代替。但是,我不明白为什么从未达到 subscribe...

val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)

subject
    .map { it + 1 }
    .subscribeOn(Schedulers.computation())
    .subscribe {
        println(Thread.currentThread().name)
        countDownLatch.countDown()
    }

subject.onNext(1)
countDownLatch.await()

为什么会这样

订阅 的过程中,观察者通过 Subscribe 通知向可观察对象发出其准备好接收项目的信号。有关详细信息,请参阅 Observable contract

此外,Subject 文档指出:

Note that a PublishSubject may begin emitting items immediately upon creation (unless you have taken steps to prevent this), and so there is a risk that one or more items may be lost between the time the Subject is created and the observer subscribes to it.

当您尝试通过 .subscribeOn(Schedulers.computation()) 订阅新线程后立即调用 subject.onNext(_) 时,可观察对象(即 subject)可能仍在等待来自观察者的 Subscribe 通知。例如:

subject
    .subscribeOn(Schedulers.computation())
    .subscribe { println("received item") }

// this usually prints nothing!
subject.onNext(1)

但是,如果您在发出第一个项目之前添加一点时间延迟,则可观察对象更有可能在您调用 subject.onNext(_) 之前收到来自观察者的 Subscribe 通知。例如:

subject
    .subscribeOn(Schedulers.computation())
    .subscribe { println("received item") }

// wait for subscription to be established properly
Thread.sleep(1000)

// this usually prints "received item"
subject.onNext(1)

怎么办?

如果您希望您的所有订阅都接收到一个 observable 发出的所有项目,您可以执行以下操作之一:

  • 在调用subject.onNext(_).
  • 之前阻塞主线程等待所有观察者被订阅
  • 创建一个新的可观察对象,等待所有可观察对象都被订阅,然后再在其内部调用 subject.onNext(_)

这些也可能有用:

  • ReplaySubject:这允许您存储所有以前项目的历史记录,并且 re-emit 它们在每个订阅中。缺点:您需要在内存中存储任意数量的项目。
  • ConnectableObservable:这确保了 observable 仅在调用 .connect() 后才发出项目。特别是,.autoConnect(n) 运算符确保可观察对象仅在 n 个观察者成功订阅后才发出。

示例:阻塞主线程直到订阅

val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)
val isSubscribedLatch = CountDownLatch(1)

subject
    .subscribeOn(Schedulers.computation())
    .doOnSubscribe { isSubscribedLatch.countDown() }
    .map { it + 1 }
    .subscribe {
        countDownLatch.countDown()
        println(Thread.currentThread().name)
    }

isSubscribedLatch.await()
subject.onNext(1)
countDownLatch.await()