RxJava轮询+手动刷新

RxJava polling + manual refresh

我有一个列表想要每分钟刷新一次。 例如这里的用户列表:https://github.com/android10/Android-CleanArchitecture/blob/master/domain/src/main/java/com/fernandocejas/android10/sample/domain/interactor/GetUserList.java

我使用 repeatWhen 添加定期刷新:

  public Observable<List<User>> buildUseCaseObservable(Void unused) {
    return this.userRepository
        .users()
        .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
          @Override
          public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
            return objectObservable.delay(1, TimeUnit.MINUTES);
          }
        });
  }

这样很好用,每分钟调用一次onNext。 但是如果我想立即刷新这个列表(因为用户的操作或因为通知),我不知道如何执行。

我应该 cancel/dispose observable 并重新启动一个新的吗? 谢谢

如果您不关心在其他可观察对象之一发出数据后调整刷新时间,您可以执行以下操作:

    // Specific example of a user manually requesting
    val request = Observable.create<String> { emitter ->
        refresh.setOnClickListener {
            emitter.onNext("Click Request")
        }
    }
            .observeOn(Schedulers.io())
            .flatMap {
                userRepository.users()
            }

    // Refresh based off of your original work, could use something like interval as well
    val interval = userRepository.users()
            .subscribeOn(Schedulers.io())
            .repeatWhen { objectObservable ->
                objectObservable.delay(1, TimeUnit.MINUTES)
            }

    // Combine them so that both emissions are received you can even add on another source
    Observable.merge(request,interval)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                contents.text = it.toString()
            }, {
                contents.text = it.toString()
            },{
                println(contents.text)
            })

这样就不用每次都dispose重新订阅了

根据您的代码,我了解到用户列表是在订阅后生成和发出的。

这里有一些我能想到的解决方案,而不是取消订阅并重新订阅您想立即响应的事件:

  1. 不使用 repeatWhen 运算符,而是使用 interval creation operator 结合 flatMap 每分钟调用对新 Observable 的订阅,并使用 merge 运算符添加对您感兴趣的其他事件的反应。像这样:

    @Test
    public void intervalObservableAndImmediateReaction() throws InterruptedException {
        Observable<String> obs = Observable.interval(1, TimeUnit.SECONDS)
                    .cast(Object.class)                                          
                    .mergeWith(
                              Observable.just("mockedUserClick")
                                        .delay(500, TimeUnit.MILLISECONDS))
                    .flatMap(
                             timeOrClick -> Observable.just("Generated upon subscription")
                             );
    
        obs.subscribe(System.out::println);
        Thread.currentThread().sleep(3000); //to see the prints before ending the test
    }
    

    或根据您的需要调整(但原理是一样的):

    Observable.interval(1, TimeUnit.MINUTES)
                .mergeWith(RxView.clicks(buttonView))
                .flatMap(timeOrClick -> this.userRepository.users());
    
  2. 您可以像以前一样使用 flatMap 运算符,即使在保持当前实现工作并且不合并到间隔的情况下 - 只需保持工作代码在程序链的另一个区域它到您选择的RxBinding

    RxView.touches(yourViewVariable)
          .flatMatp(motionEvent -> this.userRepository.users())
          .subscribe(theObserver);
    

    请注意,在此解决方案中,订阅是独立于两个可观察对象完成的。如果你使用不同的观察者,或者管理一个主题或那条线上的东西,你可能会过得更好。我 运行 的一个小测试显示一个订阅者处理订阅 2 个不同的 observables 没有问题(在 Rxjava1 中 - 还没有签入 Rxjava2),但我觉得很不稳定。