RxJava - 订阅者只运行一次
RxJava -Do subscribers only run once
我对订阅者以及他们对观察者的反应有些困惑。可以说我有以下简单的观察者和一个执行操作的订阅者:
Observable.just(preferences.getBoolean(C"vibrate", false))
.subscribeOn(Schedulers.io())//observe on new thread
.observeOn(AndroidSchedulers.mainThread()) //subscribe(listen) on main thread
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean shouldVibrate) {
if (shouldVibrate)
Toast.makeText(context,"i should vibrate now",Toast.SHORT).show();
}
});
我意识到当第一次看到这段代码时,观察者会立即被调用。但是,如果之后再次更改共享首选项怎么办,此代码会自动再次 运行 还是每次我调用 subscribe 时仅 运行 ?如果每次更改共享首选项时我都希望它 运行 怎么办(有点像观察者)。
这真的取决于可观察的。我建议阅读 "Hot" and "Cold" Observables 反应式 Observable 文档。
在您的例子中,这是一个 Cold observable。每次订阅它都会重新订阅。但是,您只能订阅一次。您的代码片段实际上会阻止偏好获取(可能不是一个大问题),但它只会发出一个偏好。
我建议在 RxJava 的 RxAndroid 扩展库中使用 ContentObservable class,您已经在使用它(因为 AndroidSchedulers)。
它看起来像这样(这是餐巾纸背面的代码,我没有编译或 运行 这个):
// Defer the observable so it gets a fresh preference value. Also, we'll
// be using it a few times.
final Observable<Boolean> vibratePreference = Observable.defer(
new Func0<Observable<Boolean>>() {
@Override
public Observable<Boolean> call() {
return Observable.just(preferences.getBoolean("vibrate", false));
}
});
vibratePreference
.concatWith(ContentObservable.fromSharedPreferencesChanges(preferences)
// Only consider changes to the vibrate preference.
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(final String key) {
return "vibrate".equals(key);
}
})
// Each time the preference changes, get the latest value.
.flatMap(new Func1<String, Observable<Boolean>>() {
@Override
public Observable<Boolean>(final String unusedKey) {
return vibratePreference;
}
}))
.scheduleOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe( /* ...and so on. */ );
此外,如果您在 activity 或片段上执行此操作,我强烈建议您查看 RxAndroid 中 AppObservable
中的 bindActivity
和 bindFragment
以制作确保将此可观察对象绑定到生命周期。您可能还想存储一个 CompositeSubscription
,您可以在 onPause
中将其清空并在 onResume
中恢复订阅。这些有点离题,但很可能很快就会有用。
我对订阅者以及他们对观察者的反应有些困惑。可以说我有以下简单的观察者和一个执行操作的订阅者:
Observable.just(preferences.getBoolean(C"vibrate", false))
.subscribeOn(Schedulers.io())//observe on new thread
.observeOn(AndroidSchedulers.mainThread()) //subscribe(listen) on main thread
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean shouldVibrate) {
if (shouldVibrate)
Toast.makeText(context,"i should vibrate now",Toast.SHORT).show();
}
});
我意识到当第一次看到这段代码时,观察者会立即被调用。但是,如果之后再次更改共享首选项怎么办,此代码会自动再次 运行 还是每次我调用 subscribe 时仅 运行 ?如果每次更改共享首选项时我都希望它 运行 怎么办(有点像观察者)。
这真的取决于可观察的。我建议阅读 "Hot" and "Cold" Observables 反应式 Observable 文档。
在您的例子中,这是一个 Cold observable。每次订阅它都会重新订阅。但是,您只能订阅一次。您的代码片段实际上会阻止偏好获取(可能不是一个大问题),但它只会发出一个偏好。
我建议在 RxJava 的 RxAndroid 扩展库中使用 ContentObservable class,您已经在使用它(因为 AndroidSchedulers)。
它看起来像这样(这是餐巾纸背面的代码,我没有编译或 运行 这个):
// Defer the observable so it gets a fresh preference value. Also, we'll
// be using it a few times.
final Observable<Boolean> vibratePreference = Observable.defer(
new Func0<Observable<Boolean>>() {
@Override
public Observable<Boolean> call() {
return Observable.just(preferences.getBoolean("vibrate", false));
}
});
vibratePreference
.concatWith(ContentObservable.fromSharedPreferencesChanges(preferences)
// Only consider changes to the vibrate preference.
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(final String key) {
return "vibrate".equals(key);
}
})
// Each time the preference changes, get the latest value.
.flatMap(new Func1<String, Observable<Boolean>>() {
@Override
public Observable<Boolean>(final String unusedKey) {
return vibratePreference;
}
}))
.scheduleOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe( /* ...and so on. */ );
此外,如果您在 activity 或片段上执行此操作,我强烈建议您查看 RxAndroid 中 AppObservable
中的 bindActivity
和 bindFragment
以制作确保将此可观察对象绑定到生命周期。您可能还想存储一个 CompositeSubscription
,您可以在 onPause
中将其清空并在 onResume
中恢复订阅。这些有点离题,但很可能很快就会有用。