使用 RxJava 进行服务器轮询 - 在获得服务器响应后重复

Server polling using RxJava - repeat after geting server response

目前我正在尝试使用 RxJava 实现服务器轮询,我已经研究了如何在收到服务器响应后重复整个链,我尝试过使用 repeat(),它可以工作但不是那么完美,原因是它使 api 调用了很多次,服务器在发送给客户端之前需要额外的时间来处理数据,但我们不知道确切的时间,所以我们无法使用 repeatWhen() 来指定具体时间。我唯一可以使用的是在 api 响应之后等待。

如有任何建议,我们将不胜感激!

以下是代码片段:

retrofitService.requestPolling()
              .repeat()   // do not wait to call server so many times
              .takeUntil(new Func1<PollResponse, Boolean>() {
                      @Override
                      public Boolean call(PollResponse pollResponse) {
                            return pollResponse.mComplete;
                       }
               })
             .doOnNext(new Action1<FlightSearchPollResponse>() {
                @Override
                public void call(pollResponse pollResponse) {
                      // update UI here
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<PollResponse>() {
                @Override
                public void onCompleted() {

                    }
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(PollResponse pollResponse) {

            } );

编辑:我是 RxJava 的新手,刚刚得到这个名为 BackPressure 的主题,并且有很多文章解释了如何处理它,因为我不想缓存这个响应,似乎 Subject 是一个不错的选择,它允许您控制何时拉动。

http://akarnokd.blogspot.com/2015/06/subjects-part-1.html

感谢@Gary LO

应该有很多方法。我想分享其中之一。

  1. 创建单独的信号流PublishSubject pollingSignal
  2. 将信号转换为 api 呼叫
  3. 发布信号再做一次。

    final PublishSubject<Boolean> pollingSignal = PublishSubject.create();
    
    final Observable<PollResponse> apiResponse = retrofitService.requestPolling();
    
    pollingSignal
      .flatMap(x -> apiResponse)
      .subscribe(new Observer<PollResponse>() {
        @Override
        public void onCompleted() {}
    
        @Override
        public void onError(Throwable throwable) {}
    
        @Override
        public void onNext(PollResponse integer) {
          // start the next polling 
          pollingSignal.onNext(true);
      }
    });
    
    // start the first polling 
    pollingSignal.onNext(true);
    

玩得开心!

注释 使用 PublishSubject<Boolean> 而不是 PublishSubject<Void> 是因为我不习惯使用 pollingSignal.onNext(null).

但在 Kotlin 中,我可以将 PublishSubject<Unit>pollingSignal.onNext(Unit)

一起使用