RxJava2 中 onBackpressureBuffer 的行为是什么
What's the behavior of onBackpressureBuffer in RxJava2
我想做的是让一个 Flowable 带有一个项目的背压缓冲区,以保持从流中产生的最新项目。
我尝试使用 Flowable.onBackpressureBuffer(1, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST)。然而,它并没有像我预期的那样工作
Flowable.range(0, 10_000)
.onBackpressureBuffer(1, {}, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe {
println(it)
Thread.sleep(5)
}
我预期的输出是一个整数序列,不一定是连续的,应该包括最后一项 9,999。但是,它只打印了前几个连续的数字,如 0、1、2、3、4...,每次都不同,但不是最后一个数字 9,999。
我正在使用下面的代码,它最后总是打印 9999。它首先打印连续的数字(直到 127),然后打印 9999。也许在您的情况下,主执行线程结束的时间比处理打印数字的线程早得多。为了打印直到 9999 的所有数字,我尝试将背压缓冲区更改为 10000(并将主线程休眠设置为更高的值),这显然确保了打印所有数字,因为缓冲区非常大。
public class FlowableTest {
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
Flowable.range(0, 10_000).onBackpressureBuffer(1, () -> {
}, BackpressureOverflowStrategy.DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(it -> {
System.out.println(it);
Thread.sleep(5);
});
Thread.sleep(50000); // wait the main program sufficient time to let the other threads end
}
我想做的是让一个 Flowable 带有一个项目的背压缓冲区,以保持从流中产生的最新项目。
我尝试使用 Flowable.onBackpressureBuffer(1, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST)。然而,它并没有像我预期的那样工作
Flowable.range(0, 10_000)
.onBackpressureBuffer(1, {}, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe {
println(it)
Thread.sleep(5)
}
我预期的输出是一个整数序列,不一定是连续的,应该包括最后一项 9,999。但是,它只打印了前几个连续的数字,如 0、1、2、3、4...,每次都不同,但不是最后一个数字 9,999。
我正在使用下面的代码,它最后总是打印 9999。它首先打印连续的数字(直到 127),然后打印 9999。也许在您的情况下,主执行线程结束的时间比处理打印数字的线程早得多。为了打印直到 9999 的所有数字,我尝试将背压缓冲区更改为 10000(并将主线程休眠设置为更高的值),这显然确保了打印所有数字,因为缓冲区非常大。
public class FlowableTest {
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
Flowable.range(0, 10_000).onBackpressureBuffer(1, () -> {
}, BackpressureOverflowStrategy.DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(it -> {
System.out.println(it);
Thread.sleep(5);
});
Thread.sleep(50000); // wait the main program sufficient time to let the other threads end
}