RxJava Subject with Backpressure - 只有在下游完成消费后才发出最后一个值

RxJava Subject with Backpressure - only let the last value emit once downstream has finished consuming

我有一个在某些 UI 事件上调用 onNext() 的 PublishSubject。订阅者通常需要 2 秒来完成其工作。我需要在用户忙时忽略对 onNext() 的所有呼叫,但最后一个呼叫除外。我尝试了以下方法,但是我无法控制流程。请求似乎排队,每个请求都得到处理(因此背压似乎不起作用)。我怎样才能让它忽略除最后一个请求之外的所有请求? (我不想使用 debounce,因为代码需要立即做出反应,任何合理的小超时都不起作用)。

此外,我意识到对主题使用 subscribeOn 没有任何效果,因此我使用 observeOn 在其中一个运算符中进行异步工作。这是正确的做法吗?

Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();

loadingQueue
  .toFlowable(BackpressureStrategy.LATEST)
  .observeOn(AndroidSchedulers.mainThread())
  .map(discarded -> {
    // PRE-LOADING
    Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
    return discarded;
   })
   .observeOn(Schedulers.computation())
   .map(b -> {
     Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
     Thread.sleep(2000);
     return b;
   })
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(b -> {
      Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
   });


loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....

我看到的输出是:

PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main

相反,我希望代码执行以下操作(即加载一次,并且在加载时,背压以阻止所有请求并在第一个观察者完成后发出最后一个请求 - 所以总的来说理想情况下最多只加载两次):

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

您不能使用 observeOn 执行此操作,因为它会缓冲至少 1 个元素,因此如果已经有一个 "LOADING" 发生,则始终执行 "PRE-LOADING" 阶段。

但是您可以使用 delay 来做到这一点,因为它不会操纵链上的请求数量,而是在调度程序上单独安排每个 onNext,而无需自行排队:

public static void main(String[] args) throws Exception {
    Subject<Boolean> loadingQueue = 
         PublishSubject.<Boolean>create().toSerialized();

    loadingQueue
      .toFlowable(BackpressureStrategy.LATEST)
      .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())        // <-------
      .map(discarded -> {
        // PRE-LOADING
        System.out.println("PRE-LOADING: " 
             + Thread.currentThread().getName());
        return discarded;
       })
       .delay(0, TimeUnit.MILLISECONDS, Schedulers.computation())  // <-------
       .map(b -> {
           System.out.println("LOADING: " 
             + Thread.currentThread().getName());
         Thread.sleep(2000);
         return b;
       })
       .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())       // <-------
       .rebatchRequests(1)             // <----------------------------------- one-by-one
       .subscribe(b -> {
           System.out.println("FINISHED: " 
               + Thread.currentThread().getName() + "\n\n");
       });


    loadingQueue.onNext(true);
    loadingQueue.onNext(true);
    loadingQueue.onNext(true);

    Thread.sleep(10000);
}