了解可流动的背压rxjava2

Understanding flowable backpressure rxjava2

我把这个虚拟例子放在一起试图更好地理解backpressure

Flowable.range(1, 100).onBackpressureDrop()
                      .subscribeOn(Schedulers.io())
                      .observeOn(AndroidSchedulers.mainThread())
                      .subscribeWith(object : DisposableSubscriber<Int>() {
                        override fun onStart() {
                          request(1)
                        }

                        override fun onComplete() {
                          Log.d(this@MainActivity::class.java.simpleName, "onComplete")
                        }

                        override fun onNext(t: Int?) {
                          Log.d(this@MainActivity::class.java.simpleName, t.toString())
                          Thread.sleep(1000)
                          request(1)
                        }

                        override fun onError(t: Throwable?) { //handle error}
                      })

我有一个非常慢的 Subscriber 消耗来自非常快的 Flowable 的数据。我正在指示 Flowable onBackPressureDrop()。尽管如此,我的输出看起来像这样(从 1 到 100)

07-16 23:07:21.097 22389-22389 D: 1
07-16 23:07:22.100 22389-22389 D: 2
07-16 23:07:23.102 22389-22389 D: 3
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.105 22389-22389 D: 99
07-16 23:07:25.105 22389-22389 D: 100
07-16 23:07:25.107 22389-22389 D: onComplete

由于订阅者速度极慢,我期待缺少元素,但事实并非如此,从 1 到 100 的所有数字都打印到控制台,每秒一个。

接下来,我尝试一次请求所有值。因此,我将 onStart 中的 request(1) 替换为 request(Long.MAX_VALUE),并从 onNext 调用中删除了 request(1)。但它仍然打印 1 到 100 的数字,没有丢失任何元素。

所以我想知道如何为慢速订阅者模拟订阅者丢失事件? 如何使背压异常发生?

谢谢

observeOn 的默认内部缓冲区大小为 128,这就是为什么您看不到元素被丢弃的原因,因为它可以简单地缓冲您正在生成的所有 100 个元素。您可以通过 observeOn(mainThread(), false, 1) 将缓冲区大小设置为 1,然后体验下降。