RxJava:onBackpressureBlock() 奇怪的行为
RxJava: onBackpressureBlock() strange behavior
我正在玩弄 RxJava(准确地说是 RxKotlin)。这里我有以下 Observable
s:
fun metronome(ms: Int) = observable<Int> {
var i = 0;
while (true) {
if (ms > 0) {
Thread.sleep(ms.toLong())
}
if (it.isUnsubscribed()) {
break
}
it.onNext(++i)
}
}
我想合并其中的一些并同时 运行ning。它们忽略背压,因此必须对它们应用背压运算符。
然后我创建
val cores = Runtime.getRuntime().availableProcessors()
val threads = Executors.newFixedThreadPool(cores)
val scheduler = Schedulers.from(threads)
然后我合并 metronome
s:
val o = Observable.merge(listOf(metronome(0),
metronome(1000).map { "---------" })
.map { it.onBackpressureBlock().subscribeOn(scheduler) })
.take(5000, TimeUnit.MILLISECONDS)
第一个应该不断地发射物品。
如果我在 运行 的最后 3 秒内这样做,我会得到以下输出:
...
[RxComputationThreadPool-5]: 369255
[RxComputationThreadPool-5]: 369256
[RxComputationThreadPool-5]: 369257
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
似乎 Observable
订阅了同一个线程,第一个 observable 被阻塞了 3 秒以上。
但是当我交换 onBackpressureBlock()
和 subscribeOn(scheduler)
调用时,输出变成了我所期望的,输出在整个执行过程中被合并。
对我来说很明显 RxJava 中的调用顺序很重要,但我不太明白在这种特殊情况下会发生什么。
那么当应用 onBackpressureBlock
运算符时会发生什么 before subscribeOn
如果 after 呢?
onBackpressureBlock
运算符是一个失败的实验;需要注意在哪里申请。例如,subscribeOn().onBackpressureBlock()
有效,但反之则不然。
RxJava 有称为 interval
的非阻塞周期性定时器,所以你不需要自己动手。
我正在玩弄 RxJava(准确地说是 RxKotlin)。这里我有以下 Observable
s:
fun metronome(ms: Int) = observable<Int> {
var i = 0;
while (true) {
if (ms > 0) {
Thread.sleep(ms.toLong())
}
if (it.isUnsubscribed()) {
break
}
it.onNext(++i)
}
}
我想合并其中的一些并同时 运行ning。它们忽略背压,因此必须对它们应用背压运算符。
然后我创建
val cores = Runtime.getRuntime().availableProcessors()
val threads = Executors.newFixedThreadPool(cores)
val scheduler = Schedulers.from(threads)
然后我合并 metronome
s:
val o = Observable.merge(listOf(metronome(0),
metronome(1000).map { "---------" })
.map { it.onBackpressureBlock().subscribeOn(scheduler) })
.take(5000, TimeUnit.MILLISECONDS)
第一个应该不断地发射物品。 如果我在 运行 的最后 3 秒内这样做,我会得到以下输出:
...
[RxComputationThreadPool-5]: 369255
[RxComputationThreadPool-5]: 369256
[RxComputationThreadPool-5]: 369257
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
似乎 Observable
订阅了同一个线程,第一个 observable 被阻塞了 3 秒以上。
但是当我交换 onBackpressureBlock()
和 subscribeOn(scheduler)
调用时,输出变成了我所期望的,输出在整个执行过程中被合并。
对我来说很明显 RxJava 中的调用顺序很重要,但我不太明白在这种特殊情况下会发生什么。
那么当应用 onBackpressureBlock
运算符时会发生什么 before subscribeOn
如果 after 呢?
onBackpressureBlock
运算符是一个失败的实验;需要注意在哪里申请。例如,subscribeOn().onBackpressureBlock()
有效,但反之则不然。
RxJava 有称为 interval
的非阻塞周期性定时器,所以你不需要自己动手。