依次执行 rx.Obseravables

Executing rx.Obseravables secuentially

我正在使用 Fernando Ceja 的简洁架构开发 Android 应用程序。我的交互者或用例之一负责获取用户的提要数据。为了获取数据,首先我必须从数据库中检索用户的团队 table,然后我必须从服务器端获取 Feed 列表。

这就是我从数据库层获取 Teams 的方式:

    mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
        @Override
        public void onNext(List<SimpleTeam> simpleTeams) {
            super.onNext(simpleTeams);
            mTeams = simpleTeams;
        }
    });

TeamCache 基本上只是另一个交互器,负责获取数据库中的所有团队。

以下是我从服务器端获取 Feed 数据的方法:

    mFeedRepository.getFeed(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
        @Override
        protected void onServerSideError(Throwable errorResponse) {
            callback.onFeedFetchFailed(...);
        }

        @Override
        protected void onSuccess(List<ApiFeedResponse> responseBody) {
            //Do stuff with mTeams
            callback.onFeedFetched(...);
        }
    });

我的 GetFeedInteractor class 有一个名为 execute 的方法,我在其中传递稍后在 UI 中使用的回调来处理响应。所有这一切的问题是目前我正在像这样链接响应:

@Override
public void execute(final Callback callback, String userSipId) {
    mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
        @Override
        public void onNext(List<SimpleTeam> simpleTeams) {
            super.onNext(simpleTeams);
            mTeams = simpleTeams;
            getFeedFromRepository(callback);
        }
    });
}

public void getFeedFromRepository(final Callback callback) {
    mFeedRepository.getFeedRx(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
        @Override
        protected void onServerSideError(Throwable errorResponse) {
            callback.onFeedFetchFailed("failed");
        }

        @Override
        protected void onSuccess(List<ApiFeedResponse> responseBody) {
            //Do stuff with mTeams
            List<BaseFeedItem> responseList = new ArrayList();

            for (ApiFeedResponse apiFeedResponse : responseBody) {
                responseList.add(FeedDataMapper.transform(apiFeedResponse));
            }

            callback.onFeedFetched(responseList);
        }
    });
}

如您所见,一旦我从缓存交互器中获取团队集合,我就会调用从同一个订阅者获取提要的方法。我不喜欢这个。我希望能够做一些更好的事情,比如使用 Observable.concat(getTeamsFromCache(), getFeedFromRepository());将调用链接到订阅者内的另一个 rx.Observable 并不是一件好事。我想我的问题是,如何链接两个使用不同订阅者的 rx.Observable?

更新:

ServerSubscriber 是我实现订阅 Retrofit 服务的订阅者。它只是检查错误代码和一些东西。这里是:

https://gist.github.com/4gus71n/65dc94de4ca01fb221a079b68c0570b5

默认订阅者是一个空的默认订阅者。这里是:

https://gist.github.com/4gus71n/df501928fc5d24c2c6ed7740a6520330

TeamCache#getAllTeams() returns rx.Observable> FeedRepository#getFeed(int page, int offset) returns rx.Observable>

更新 2:

这是获取用户提要的交互器现在的样子:

@Override
    public void execute(final Callback callback, int offset, int pageSize) {
        User user = mGetLoggedUser.get();
        String userSipid = mUserSipid.get();

        mFeedRepository.getFeed(offset, pageSize) //Get items from the server-side
                .onErrorResumeNext(mFeedCache.getFeed(userSipid)) //If something goes wrong take it from cache
                .mergeWith(mPendingPostCache.getAllPendingPostsAsFeedItems(user)) //Merge the response with the pending posts
                .subscribe(new DefaultSubscriber<List<BaseFeedItem>>() {
                    @Override
                    public void onNext(List<BaseFeedItem> baseFeedItems) {
                        callback.onFeedFetched(baseFeedItems);
                    }

                    @Override
                    public void onError(Throwable e) {
                        if (e instanceof ServerSideException) {
                            //Handle the http error
                        } else if (e instanceof DBException) {
                            //Handle the database cache error
                        } else {
                            //Handle generic error
                        }
                    }
                });
    }

我认为你错过了 RxJava 和反应式方法的要点,你不应该有不同的具有 OO 层次结构和回调的订阅者。
您应该构建单独的 Observables 来发出它处理的特定数据,没有 Subscriber,然后您可以根据需要链接您 Observable,最后,您有订阅者对链式 Observable 流预期的最终结果作出反应。

像这样(使用 lambda 来获得更精简的代码):

TeamCache mTeamCache = new TeamCache();
FeedRepository mFeedRepository = new FeedRepository();

Observable.zip(teamsObservable, feedObservable, Pair::new)
    .subscribe(resultPair -> {
            //Do stuff with mTeams
            List<BaseFeedItem> responseList = new ArrayList(); 
            for (ApiFeedResponse apiFeedResponse : resultPair.second) {
                responseList.add(FeedDataMapper.transform(apiFeedResponse));
            }
        }, throwable -> {
             //handle errors
           }
     );

我使用的是 zip 而不是 concat,因为您似乎在这里有 2 个独立的调用,您想要等待它们完成('zip' 它们一起完成)然后采取行动,但当然,因为您已经分离了 Observables 流,您可以根据需要将它们以不同的方式链接在一起。

至于你的 ServerSubscriber 和所有响应验证逻辑,它也应该是 rxify,所以你可以沿着你的服务器 Observable 流来组合它。

类似这样的东西(为了简化而发出的一些逻辑,因为我不熟悉它......)

Observable<List<SimpleTeam>> teamsObservable = mTeamCache.getAllTeams();
Observable<List<ApiFeedResponse>> feedObservable = mFeedRepository.getFeed(0, 50)
            .flatMap(apiFeedsResponse -> {
                if (apiFeedsResponse.code() != 200) {
                    if (apiFeedsResponse.code() == 304) {
                        List<ApiFeedResponse> body = apiFeedsResponse.body();
                        return Observable.just(body);
                        //onNotModified(o.body());
                    } else {
                        return Observable.error(new ServerSideErrorException(apiFeedsResponse));
                    }
                } else {
                    //onServerSideResponse(o.body());
                    return Observable.just(apiFeedsResponse.body());
                }
            });