如何正确地将背压应用于 PublishSubject?
How to properly apply backpressure to PublishSubject?
我有一个 PublishSubject
发出位置更新 (LatLng
):
val location = PublishSubject.create<LatLng>()
现在,我想对这个位置做点什么,这可能需要一些时间,应该按顺序完成:
location
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe {
// ... CPU-heavy operation
}
只有在subscribe
中的每个操作完成后,才能进行下一个运行。此外,每次更新都会使之前的更新过时,所以我只对最新的值感兴趣。因此,订阅者在每次更新时应该只收到当前最新的值。因此,我想到了施加背压:
location
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.toFlowable(BackpressureStrategy.LATEST)
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})
不幸的是,这不起作用,因为发出的每个 LatLng
都已传送给订阅者,并且没有值被跳过,这是应该的。
问题是observeOn
总是可以缓冲至少一个元素。您可以通过 delay
实现所需的效果:
location
.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.SECONDS, Schedulers.computation())
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})
补充说明:
subscribeOn
对Subject
没有实际影响,可以忽略
toFlowable
最好靠近源头应用,以避免 delay
被不必要的项目淹没。
- 如果您不介意第 3 方运营商,observeOnLatest 运营商可以做同样的事情。
我有一个 PublishSubject
发出位置更新 (LatLng
):
val location = PublishSubject.create<LatLng>()
现在,我想对这个位置做点什么,这可能需要一些时间,应该按顺序完成:
location
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe {
// ... CPU-heavy operation
}
只有在subscribe
中的每个操作完成后,才能进行下一个运行。此外,每次更新都会使之前的更新过时,所以我只对最新的值感兴趣。因此,订阅者在每次更新时应该只收到当前最新的值。因此,我想到了施加背压:
location
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.toFlowable(BackpressureStrategy.LATEST)
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})
不幸的是,这不起作用,因为发出的每个 LatLng
都已传送给订阅者,并且没有值被跳过,这是应该的。
问题是observeOn
总是可以缓冲至少一个元素。您可以通过 delay
实现所需的效果:
location
.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.SECONDS, Schedulers.computation())
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})
补充说明:
subscribeOn
对Subject
没有实际影响,可以忽略toFlowable
最好靠近源头应用,以避免delay
被不必要的项目淹没。- 如果您不介意第 3 方运营商,observeOnLatest 运营商可以做同样的事情。