RxJava:onBackpressureBlock() 奇怪的行为

RxJava: onBackpressureBlock() strange behavior

我正在玩弄 RxJava(准确地说是 RxKotlin)。这里我有以下 Observables:

fun metronome(ms: Int) = observable<Int> {
    var i = 0;
    while (true) {
        if (ms > 0) {
            Thread.sleep(ms.toLong())
        }
        if (it.isUnsubscribed()) {
            break
        }
        it.onNext(++i)
    }
}

我想合并其中的一些并同时 运行ning。它们忽略背压,因此必须对它们应用背压运算符。

然后我创建

val cores = Runtime.getRuntime().availableProcessors()
val threads = Executors.newFixedThreadPool(cores)
val scheduler = Schedulers.from(threads)

然后我合并 metronomes:

val o = Observable.merge(listOf(metronome(0),
                                metronome(1000).map { "---------" })
                         .map { it.onBackpressureBlock().subscribeOn(scheduler) })
                  .take(5000, TimeUnit.MILLISECONDS)

第一个应该不断地发射物品。 如果我在 运行 的最后 3 秒内这样做,我会得到以下输出:

...
[RxComputationThreadPool-5]: 369255
[RxComputationThreadPool-5]: 369256
[RxComputationThreadPool-5]: 369257
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------

似乎 Observable 订阅了同一个线程,第一个 observable 被阻塞了 3 秒以上。

但是当我交换 onBackpressureBlock()subscribeOn(scheduler) 调用时,输出变成了我所期望的,输出在整个执行过程中被合并。

对我来说很明显 RxJava 中的调用顺序很重要,但我不太明白在这种特殊情况下会发生什么。

那么当应用 onBackpressureBlock 运算符时会发生什么 before subscribeOn 如果 after 呢?

onBackpressureBlock运算符是一个失败的实验;需要注意在哪里申请。例如,subscribeOn().onBackpressureBlock() 有效,但反之则不然。

RxJava 有称为 interval 的非阻塞周期性定时器,所以你不需要自己动手。