RxJava2、OrmLite — 一个 Observable、多个订阅者 - 为所有订阅者重复数据

RxJava2, OrmLite — One Observable, Multiple Subscribers - Repeat Data for all subscribers

我使用 ormlite 并将数据库数据转换为 rx Observable。

public static <T> Observable<T> createObservable(@NonNull Observable<T> observable) {
    return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
}


public Observable<List<Country>> getCountriesObservable() {
    return RxUtils.createObservable(new Observable<List<Country>>() {
        @Override
        protected void subscribeActual(Observer<? super List<Country>> observer) {
            try {
                List<Country> list = mCountryDao.getCountries();
                observer.onNext(list == null ? Collections.<Country>emptyList() : list);
                observer.onComplete();
            } catch (SQLException exc) {
                Log.e(TAG, exc.getMessage());
                observer.onError(exc);
            }
        }
    });
}

我有三个不同的 classes A、B、C。在这个 classes 中我创建了三个观察者。我订阅了 observable。

observable.subscribe(observerA);
observable.subscribe(observerB);
observable.subscribe(observerC);

之后我更改了 class A 中的数据并更新了数据库。 有没有办法为所有观察者说可观察的重复新数据?

或者我需要重新编写这段代码?

observable.subscribe(observerA);
observable.subscribe(observerB);
observable.subscribe(observerC);

免责声明:我没有直接使用 RxJava 的经验,只有 Rx.NET、RxJS 和 RxSwift(而且我已经 10 多年没有做过 Java 开发了)

首先,我建议在订阅之前对可观察对象使用 share() 运算符。这确保您不会做额外的工作:

Observable<List<Country>> sharedObservable = observable.share();

sharedObservable.subscribe(observerA);
sharedObservable.subscribe(observerB);
sharedObservable.subscribe(observerC);

一个可观察对象是一个数据流。现在它只包含一个值,但你可以让它 return 多个值。当然,所有订阅者都会收到进来的新值。

您需要通知数据已更新,然后在更新时生成新值。在可观察对象中任意创建值的一种简单方法是使用 Subject。主题有点"un-Rx-y",但为了简单起见,我将在这里使用一个:

// All this inside the same class

private PublishSubject<Object> updatesSubject = new PublishSubject<Object>();

public void SignalUpdate(){
    updatesSubject.onNext(new Object());
}

public Observable<List<Country>> getCountriesObservable() {
    return updatesSubject
        .startWith(new Object)
        .flatMap(_ -> RxUtils.createObservable(new Observable<List<Country>>() {
            @Override
            protected void subscribeActual(Observer<? super List<Country>> observer) {
                try {
                    List<Country> list = mCountryDao.getCountries();
                    observer.onNext(list == null ? Collections.<Country>emptyList() : list);
                    observer.onComplete();
                } catch (SQLException exc) {
                    Log.e(TAG, exc.getMessage());
                    observer.onError(exc);
                }
            }
    });
}

每次调用 signalUpdate() 时都会 return 一个 List<Country>。为确保您在首次订阅时获得价值,我们调用 startWith() 将立即发出给定值并启动数据的第一次加载。