从远程重用 RxJava 流 API

Reuse RxJava stream from a remote API

我有一个 API 调用,我想使用 Observable 包装它:

private Observable<RealmResults<Account>> getAccounts() {
        final Observable<RealmResults<Account>> realmAccounts =
                Observable.defer(new Func0<Observable<RealmResults<Account>>>() {
                    @Override
                    public Observable<RealmResults<Account>> call() {
                        return RealmObservable.results(getActivity(), new Func1<Realm, RealmResults<Account>>() {
                            @Override
                            public RealmResults<Account> call(Realm realm) {
                                return realm.where(Account.class).findAll();
                            }
                        });
                    }
                });

        return Observable
                .create(new Observable.OnSubscribe<RealmResults<Account>>() {
                    @Override
                    public void call(final Subscriber<? super RealmResults<Account>> subscriber) {
                        DataBridge.getAccounts(Preferences.getString(Constant.ME_GUID, ""), new OnResponseListener() {
                            @Override
                            public void OnSuccess(Object data) {
                                Log.d("Stream", "onSuccess");
                                realmAccounts.subscribe(subscriber);
                            }

                            @Override
                            public void onFailure(Object data) {
                                subscriber.onError(new Exception(data.toString()));
                            }
                        });
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .startWith(realmAccounts);
    }

我喜欢用它

Observable<Accounts> accounts = getAccounts().flatMap(
                new Func1<RealmResults<Account>, Observable<Account>>() {
                    @Override
                    public Observable<Account> call(RealmResults<Account> accounts) {
                        return Observable.from(accounts);
                    }
                });

如何多次使用 accounts observable 而无需每次都调用 API。我需要处理帐户流并从中提取不同的数据集。

最简单的方法是使用运算符cache, which internally uses ReplaySubject。它缓存源可观察项,然后从缓存中提供结果。

... 
Observable<<RealmResults<Account>> cachedResult = getAccounts().cache();
Observable<Accounts> accountsObservable = cachedResult.flatMap(...);
Observable<X> xObservable = cachedResult.flatMap(...);

如果您想避免缓存结果,您应该使用 Connectable Observables。通常它只对 Hot Observables 有意义。 Connectable observable 在其 Connect 方法被调用之前不会开始发射项目。您可以使用 publish 运算符转换为 Connectable Observable。

ConnectableObservable<<RealmResults<Account>> connectebleObservable = getAccounts().publish();
Observable<Accounts> accountsObservable = connectebleObservable .flatMap(...);
Observable<X> xObservable = connectebleObservable .flatMap(...);
//You must subscribe before connect
accountsObservable.subsribe(...);
xObservable.subscribe(...);
//start emiting data
connectebleObservable.connect();

这里要注意的是,你必须在连接之前订阅 - 以避免数据丢失 - 否则你必须使用重播运算符,它类似于缓存运算符,但用于可连接的可观察

那么分享呢?

它创建 ConnectableObservable 并将其作为常规 Observable 公开。首次订阅自动导致连接和发射。

在您的案例中使用的共享,不重放可能会导致数据丢失或多次执行,具体取决于时间。 例如,对于 2 个订阅者和流中的一个项目,您可能有以下情况:

  1. 2 个订阅在 onNext 之前创建 - 按预期工作。
  2. 第二个订阅在 onNext 之后但在 onComplete 之前创建 - 第二个订阅仅获得 onComplete
  3. 在 onComplete 之后创建的第二个订阅 - 2 次执行没有缓存