如何正确地将背压应用于 PublishSubject?

How to properly apply backpressure to PublishSubject?

我有一个 PublishSubject 发出位置更新 (LatLng):

val location = PublishSubject.create<LatLng>()

现在,我想对这个位置做点什么,这可能需要一些时间,应该按顺序完成:

location
    .observeOn(Schedulers.computation())
    .subscribeOn(Schedulers.io())
    .subscribe {
        // ... CPU-heavy operation
    }

只有在subscribe中的每个操作完成后,才能进行下一个运行。此外,每次更新都会使之前的更新过时,所以我只对最新的值感兴趣。因此,订阅者在每次更新时应该只收到当前最新的值。因此,我想到了施加背压:

location
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribeWith(object : DisposableSubscriber<LatLng>() {
        override fun onStart() {
            request(1)
        }

        override fun onError(t: Throwable) {
            // ...
        }

        override fun onComplete() {
            // ...
        }

        override fun onNext(t: LatLng) {
            // ... CPU-heavy task
            request(1)
        }
    })

不幸的是,这不起作用,因为发出的每个 LatLng 都已传送给订阅者,并且没有值被跳过,这是应该的。

问题是observeOn总是可以缓冲至少一个元素。您可以通过 delay 实现所需的效果:

location
    .toFlowable(BackpressureStrategy.LATEST)
    .delay(0, TimeUnit.SECONDS, Schedulers.computation())
    .subscribeWith(object : DisposableSubscriber<LatLng>() {
        override fun onStart() {
            request(1)
        }

        override fun onError(t: Throwable) {
            // ...
        }

        override fun onComplete() {
            // ...
        }

        override fun onNext(t: LatLng) {
            // ... CPU-heavy task
            request(1)
        }
    })

补充说明:

  1. subscribeOnSubject没有实际影响,可以忽略
  2. toFlowable 最好靠近源头应用,以避免 delay 被不必要的项目淹没。
  3. 如果您不介意第 3 方运营商,observeOnLatest 运营商可以做同样的事情。