RxJava - 订阅者只运行一次

RxJava -Do subscribers only run once

我对订阅者以及他们对观察者的反应有些困惑。可以说我有以下简单的观察者和一个执行操作的订阅者:

Observable.just(preferences.getBoolean(C"vibrate", false))
            .subscribeOn(Schedulers.io())//observe on new thread
            .observeOn(AndroidSchedulers.mainThread()) //subscribe(listen) on main thread
            .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean shouldVibrate) {
                    if (shouldVibrate)
                        Toast.makeText(context,"i should vibrate now",Toast.SHORT).show();
                }
            });

我意识到当第一次看到这段代码时,观察者会立即被调用。但是,如果之后再次更改共享首选项怎么办,此代码会自动再次 运行 还是每次我调用 subscribe 时仅 运行 ?如果每次更改共享首选项时我都希望它 运行 怎么办(有点像观察者)。

这真的取决于可观察的。我建议阅读 "Hot" and "Cold" Observables 反应式 Observable 文档。

在您的例子中,这是一个 Cold observable。每次订阅它都会重新订阅。但是,您只能订阅一次。您的代码片段实际上会阻止偏好获取(可能不是一个大问题),但它只会发出一个偏好。

我建议在 RxJava 的 RxAndroid 扩展库中使用 ContentObservable class,您已经在使用它(因为 AndroidSchedulers)。

它看起来像这样(这是餐巾纸背面的代码,我没有编译或 运行 这个):

// Defer the observable so it gets a fresh preference value. Also, we'll
// be using it a few times.
final Observable<Boolean> vibratePreference = Observable.defer(
    new Func0<Observable<Boolean>>() {

      @Override
      public Observable<Boolean> call() {
        return Observable.just(preferences.getBoolean("vibrate", false));
      }

    });

vibratePreference
    .concatWith(ContentObservable.fromSharedPreferencesChanges(preferences)
        // Only consider changes to the vibrate preference.
        .filter(new Func1<String, Boolean>() {

          @Override
          public Boolean call(final String key) {
            return "vibrate".equals(key);
          }

        })
        // Each time the preference changes, get the latest value.
        .flatMap(new Func1<String, Observable<Boolean>>() {

          @Override
          public Observable<Boolean>(final String unusedKey) {
            return vibratePreference;
          }

        }))
    .scheduleOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe( /* ...and so on. */ );

此外,如果您在 activity 或片段上执行此操作,我强烈建议您查看 RxAndroid 中 AppObservable 中的 bindActivitybindFragment 以制作确保将此可观察对象绑定到生命周期。您可能还想存储一个 CompositeSubscription,您可以在 onPause 中将其清空并在 onResume 中恢复订阅。这些有点离题,但很可能很快就会有用。