如何在 Rxjava2 中获取实际的最新背压事件? Flowable.onBackpressureLatest() 未按预期工作

How to get the actual latest event on back pressure in Rxjava2? Flowable.onBackpressureLatest() not worked as expected

当生产者生产事件的速度快于客户消费的速度时。

我想使用 Flowable with onBackpressureLatest(),我可以获得最新发出的事件。

但事实证明有一个大小为 128 的默认缓冲区。我得到的是以前缓冲的日期事件。

那么我怎样才能得到实际的最新事件呢?

示例代码如下:

Flowable.interval(40, TimeUnit.MILLISECONDS)
            .doOnNext{
                println("doOnNext $it")
            }
            .onBackpressureLatest()
            .observeOn(Schedulers.single())
            .subscribe {
                println("subscribe $it")
                Thread.sleep(100)
            }

我的预期:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   2
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   5
doOnNext    6
doOnNext    7
    subscribe   7
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   10
...

我得到了什么:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   1
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   2
doOnNext    6
doOnNext    7
    subscribe   3
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   4
...
doOnNext    325
    subscribe   127
doOnNext    326
doOnNext    327
doOnNext    328
    subscribe   246
...

您的问题实际上出在 observeOn,默认情况下最多请求 128 个项目并将此请求传递给 backpressureLatest,因此其行为与您预期的不同。

您可以使用 .observeOn(Scheduler,boolean,int) 来指定缓冲区大小,它应该修复您所看到的行为。