RxJava2 中 onBackpressureBuffer 的行为是什么

What's the behavior of onBackpressureBuffer in RxJava2

我想做的是让一个 Flowable 带有一个项目的背压缓冲区,以保持从流中产生的最新项目。

我尝试使用 Flowable.onBackpressureBuffer(1, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST)。然而,它并没有像我预期的那样工作

  Flowable.range(0, 10_000)
      .onBackpressureBuffer(1, {}, BackpressureOverflowStrategy.DROP_OLDEST)
      .observeOn(Schedulers.computation())
      .subscribe {
        println(it)
        Thread.sleep(5)
      }

我预期的输出是一个整数序列,不一定是连续的,应该包括最后一项 9,999。但是,它只打印了前几个连续的数字,如 0、1、2、3、4...,每次都不同,但不是最后一个数字 9,999。

我正在使用下面的代码,它最后总是打印 9999。它首先打印连续的数字(直到 127),然后打印 9999。也许在您的情况下,主执行线程结束的时间比处理打印数字的线程早得多。为了打印直到 9999 的所有数字,我尝试将背压缓冲区更改为 10000(并将主线程休眠设置为更高的值),这显然确保了打印所有数字,因为缓冲区非常大。

public class FlowableTest {

    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub

        Flowable.range(0, 10_000).onBackpressureBuffer(1, () -> {
        }, BackpressureOverflowStrategy.DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(it -> {
            System.out.println(it);
            Thread.sleep(5);
        });

        Thread.sleep(50000); // wait the main program sufficient time to let the other threads end

    }