如何在 Rxjava2 中获取实际的最新背压事件? Flowable.onBackpressureLatest() 未按预期工作
How to get the actual latest event on back pressure in Rxjava2? Flowable.onBackpressureLatest() not worked as expected
当生产者生产事件的速度快于客户消费的速度时。
我想使用 Flowable with onBackpressureLatest(),我可以获得最新发出的事件。
但事实证明有一个大小为 128 的默认缓冲区。我得到的是以前缓冲的日期事件。
那么我怎样才能得到实际的最新事件呢?
示例代码如下:
Flowable.interval(40, TimeUnit.MILLISECONDS)
.doOnNext{
println("doOnNext $it")
}
.onBackpressureLatest()
.observeOn(Schedulers.single())
.subscribe {
println("subscribe $it")
Thread.sleep(100)
}
我的预期:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 2
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 5
doOnNext 6
doOnNext 7
subscribe 7
doOnNext 8
doOnNext 9
doOnNext 10
subscribe 10
...
我得到了什么:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 1
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 2
doOnNext 6
doOnNext 7
subscribe 3
doOnNext 8
doOnNext 9
doOnNext 10
subscribe 4
...
doOnNext 325
subscribe 127
doOnNext 326
doOnNext 327
doOnNext 328
subscribe 246
...
您的问题实际上出在 observeOn
,默认情况下最多请求 128 个项目并将此请求传递给 backpressureLatest
,因此其行为与您预期的不同。
您可以使用 .observeOn(Scheduler,boolean,int)
来指定缓冲区大小,它应该修复您所看到的行为。
当生产者生产事件的速度快于客户消费的速度时。
我想使用 Flowable with onBackpressureLatest(),我可以获得最新发出的事件。
但事实证明有一个大小为 128 的默认缓冲区。我得到的是以前缓冲的日期事件。
那么我怎样才能得到实际的最新事件呢?
示例代码如下:
Flowable.interval(40, TimeUnit.MILLISECONDS)
.doOnNext{
println("doOnNext $it")
}
.onBackpressureLatest()
.observeOn(Schedulers.single())
.subscribe {
println("subscribe $it")
Thread.sleep(100)
}
我的预期:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 2
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 5
doOnNext 6
doOnNext 7
subscribe 7
doOnNext 8
doOnNext 9
doOnNext 10
subscribe 10
...
我得到了什么:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 1
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 2
doOnNext 6
doOnNext 7
subscribe 3
doOnNext 8
doOnNext 9
doOnNext 10
subscribe 4
...
doOnNext 325
subscribe 127
doOnNext 326
doOnNext 327
doOnNext 328
subscribe 246
...
您的问题实际上出在 observeOn
,默认情况下最多请求 128 个项目并将此请求传递给 backpressureLatest
,因此其行为与您预期的不同。
您可以使用 .observeOn(Scheduler,boolean,int)
来指定缓冲区大小,它应该修复您所看到的行为。