了解可流动的背压rxjava2
Understanding flowable backpressure rxjava2
我把这个虚拟例子放在一起试图更好地理解backpressure
:
Flowable.range(1, 100).onBackpressureDrop()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableSubscriber<Int>() {
override fun onStart() {
request(1)
}
override fun onComplete() {
Log.d(this@MainActivity::class.java.simpleName, "onComplete")
}
override fun onNext(t: Int?) {
Log.d(this@MainActivity::class.java.simpleName, t.toString())
Thread.sleep(1000)
request(1)
}
override fun onError(t: Throwable?) { //handle error}
})
我有一个非常慢的 Subscriber
消耗来自非常快的 Flowable
的数据。我正在指示 Flowable onBackPressureDrop()
。尽管如此,我的输出看起来像这样(从 1 到 100)
07-16 23:07:21.097 22389-22389 D: 1
07-16 23:07:22.100 22389-22389 D: 2
07-16 23:07:23.102 22389-22389 D: 3
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.105 22389-22389 D: 99
07-16 23:07:25.105 22389-22389 D: 100
07-16 23:07:25.107 22389-22389 D: onComplete
由于订阅者速度极慢,我期待缺少元素,但事实并非如此,从 1 到 100 的所有数字都打印到控制台,每秒一个。
接下来,我尝试一次请求所有值。因此,我将 onStart
中的 request(1)
替换为 request(Long.MAX_VALUE)
,并从 onNext
调用中删除了 request(1)
。但它仍然打印 1 到 100 的数字,没有丢失任何元素。
所以我想知道如何为慢速订阅者模拟订阅者丢失事件?
如何使背压异常发生?
谢谢
observeOn
的默认内部缓冲区大小为 128,这就是为什么您看不到元素被丢弃的原因,因为它可以简单地缓冲您正在生成的所有 100 个元素。您可以通过 observeOn(mainThread(), false, 1)
将缓冲区大小设置为 1,然后体验下降。
我把这个虚拟例子放在一起试图更好地理解backpressure
:
Flowable.range(1, 100).onBackpressureDrop()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableSubscriber<Int>() {
override fun onStart() {
request(1)
}
override fun onComplete() {
Log.d(this@MainActivity::class.java.simpleName, "onComplete")
}
override fun onNext(t: Int?) {
Log.d(this@MainActivity::class.java.simpleName, t.toString())
Thread.sleep(1000)
request(1)
}
override fun onError(t: Throwable?) { //handle error}
})
我有一个非常慢的 Subscriber
消耗来自非常快的 Flowable
的数据。我正在指示 Flowable onBackPressureDrop()
。尽管如此,我的输出看起来像这样(从 1 到 100)
07-16 23:07:21.097 22389-22389 D: 1
07-16 23:07:22.100 22389-22389 D: 2
07-16 23:07:23.102 22389-22389 D: 3
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.105 22389-22389 D: 99
07-16 23:07:25.105 22389-22389 D: 100
07-16 23:07:25.107 22389-22389 D: onComplete
由于订阅者速度极慢,我期待缺少元素,但事实并非如此,从 1 到 100 的所有数字都打印到控制台,每秒一个。
接下来,我尝试一次请求所有值。因此,我将 onStart
中的 request(1)
替换为 request(Long.MAX_VALUE)
,并从 onNext
调用中删除了 request(1)
。但它仍然打印 1 到 100 的数字,没有丢失任何元素。
所以我想知道如何为慢速订阅者模拟订阅者丢失事件? 如何使背压异常发生?
谢谢
observeOn
的默认内部缓冲区大小为 128,这就是为什么您看不到元素被丢弃的原因,因为它可以简单地缓冲您正在生成的所有 100 个元素。您可以通过 observeOn(mainThread(), false, 1)
将缓冲区大小设置为 1,然后体验下降。