RxJava2、OrmLite — 一个 Observable、多个订阅者 - 为所有订阅者重复数据
RxJava2, OrmLite — One Observable, Multiple Subscribers - Repeat Data for all subscribers
我使用 ormlite 并将数据库数据转换为 rx Observable。
public static <T> Observable<T> createObservable(@NonNull Observable<T> observable) {
return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
}
public Observable<List<Country>> getCountriesObservable() {
return RxUtils.createObservable(new Observable<List<Country>>() {
@Override
protected void subscribeActual(Observer<? super List<Country>> observer) {
try {
List<Country> list = mCountryDao.getCountries();
observer.onNext(list == null ? Collections.<Country>emptyList() : list);
observer.onComplete();
} catch (SQLException exc) {
Log.e(TAG, exc.getMessage());
observer.onError(exc);
}
}
});
}
我有三个不同的 classes A、B、C。在这个 classes 中我创建了三个观察者。我订阅了 observable。
observable.subscribe(observerA);
observable.subscribe(observerB);
observable.subscribe(observerC);
之后我更改了 class A 中的数据并更新了数据库。
有没有办法为所有观察者说可观察的重复新数据?
或者我需要重新编写这段代码?
observable.subscribe(observerA);
observable.subscribe(observerB);
observable.subscribe(observerC);
免责声明:我没有直接使用 RxJava 的经验,只有 Rx.NET、RxJS 和 RxSwift(而且我已经 10 多年没有做过 Java 开发了)
首先,我建议在订阅之前对可观察对象使用 share()
运算符。这确保您不会做额外的工作:
Observable<List<Country>> sharedObservable = observable.share();
sharedObservable.subscribe(observerA);
sharedObservable.subscribe(observerB);
sharedObservable.subscribe(observerC);
一个可观察对象是一个数据流。现在它只包含一个值,但你可以让它 return 多个值。当然,所有订阅者都会收到进来的新值。
您需要通知数据已更新,然后在更新时生成新值。在可观察对象中任意创建值的一种简单方法是使用 Subject
。主题有点"un-Rx-y",但为了简单起见,我将在这里使用一个:
// All this inside the same class
private PublishSubject<Object> updatesSubject = new PublishSubject<Object>();
public void SignalUpdate(){
updatesSubject.onNext(new Object());
}
public Observable<List<Country>> getCountriesObservable() {
return updatesSubject
.startWith(new Object)
.flatMap(_ -> RxUtils.createObservable(new Observable<List<Country>>() {
@Override
protected void subscribeActual(Observer<? super List<Country>> observer) {
try {
List<Country> list = mCountryDao.getCountries();
observer.onNext(list == null ? Collections.<Country>emptyList() : list);
observer.onComplete();
} catch (SQLException exc) {
Log.e(TAG, exc.getMessage());
observer.onError(exc);
}
}
});
}
每次调用 signalUpdate()
时都会 return 一个 List<Country>
。为确保您在首次订阅时获得价值,我们调用 startWith()
将立即发出给定值并启动数据的第一次加载。
我使用 ormlite 并将数据库数据转换为 rx Observable。
public static <T> Observable<T> createObservable(@NonNull Observable<T> observable) {
return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
}
public Observable<List<Country>> getCountriesObservable() {
return RxUtils.createObservable(new Observable<List<Country>>() {
@Override
protected void subscribeActual(Observer<? super List<Country>> observer) {
try {
List<Country> list = mCountryDao.getCountries();
observer.onNext(list == null ? Collections.<Country>emptyList() : list);
observer.onComplete();
} catch (SQLException exc) {
Log.e(TAG, exc.getMessage());
observer.onError(exc);
}
}
});
}
我有三个不同的 classes A、B、C。在这个 classes 中我创建了三个观察者。我订阅了 observable。
observable.subscribe(observerA);
observable.subscribe(observerB);
observable.subscribe(observerC);
之后我更改了 class A 中的数据并更新了数据库。 有没有办法为所有观察者说可观察的重复新数据?
或者我需要重新编写这段代码?
observable.subscribe(observerA);
observable.subscribe(observerB);
observable.subscribe(observerC);
免责声明:我没有直接使用 RxJava 的经验,只有 Rx.NET、RxJS 和 RxSwift(而且我已经 10 多年没有做过 Java 开发了)
首先,我建议在订阅之前对可观察对象使用 share()
运算符。这确保您不会做额外的工作:
Observable<List<Country>> sharedObservable = observable.share();
sharedObservable.subscribe(observerA);
sharedObservable.subscribe(observerB);
sharedObservable.subscribe(observerC);
一个可观察对象是一个数据流。现在它只包含一个值,但你可以让它 return 多个值。当然,所有订阅者都会收到进来的新值。
您需要通知数据已更新,然后在更新时生成新值。在可观察对象中任意创建值的一种简单方法是使用 Subject
。主题有点"un-Rx-y",但为了简单起见,我将在这里使用一个:
// All this inside the same class
private PublishSubject<Object> updatesSubject = new PublishSubject<Object>();
public void SignalUpdate(){
updatesSubject.onNext(new Object());
}
public Observable<List<Country>> getCountriesObservable() {
return updatesSubject
.startWith(new Object)
.flatMap(_ -> RxUtils.createObservable(new Observable<List<Country>>() {
@Override
protected void subscribeActual(Observer<? super List<Country>> observer) {
try {
List<Country> list = mCountryDao.getCountries();
observer.onNext(list == null ? Collections.<Country>emptyList() : list);
observer.onComplete();
} catch (SQLException exc) {
Log.e(TAG, exc.getMessage());
observer.onError(exc);
}
}
});
}
每次调用 signalUpdate()
时都会 return 一个 List<Country>
。为确保您在首次订阅时获得价值,我们调用 startWith()
将立即发出给定值并启动数据的第一次加载。