依次执行 rx.Obseravables
Executing rx.Obseravables secuentially
我正在使用 Fernando Ceja 的简洁架构开发 Android 应用程序。我的交互者或用例之一负责获取用户的提要数据。为了获取数据,首先我必须从数据库中检索用户的团队 table,然后我必须从服务器端获取 Feed 列表。
这就是我从数据库层获取 Teams 的方式:
mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
@Override
public void onNext(List<SimpleTeam> simpleTeams) {
super.onNext(simpleTeams);
mTeams = simpleTeams;
}
});
TeamCache 基本上只是另一个交互器,负责获取数据库中的所有团队。
以下是我从服务器端获取 Feed 数据的方法:
mFeedRepository.getFeed(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
@Override
protected void onServerSideError(Throwable errorResponse) {
callback.onFeedFetchFailed(...);
}
@Override
protected void onSuccess(List<ApiFeedResponse> responseBody) {
//Do stuff with mTeams
callback.onFeedFetched(...);
}
});
我的 GetFeedInteractor class 有一个名为 execute 的方法,我在其中传递稍后在 UI 中使用的回调来处理响应。所有这一切的问题是目前我正在像这样链接响应:
@Override
public void execute(final Callback callback, String userSipId) {
mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
@Override
public void onNext(List<SimpleTeam> simpleTeams) {
super.onNext(simpleTeams);
mTeams = simpleTeams;
getFeedFromRepository(callback);
}
});
}
public void getFeedFromRepository(final Callback callback) {
mFeedRepository.getFeedRx(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
@Override
protected void onServerSideError(Throwable errorResponse) {
callback.onFeedFetchFailed("failed");
}
@Override
protected void onSuccess(List<ApiFeedResponse> responseBody) {
//Do stuff with mTeams
List<BaseFeedItem> responseList = new ArrayList();
for (ApiFeedResponse apiFeedResponse : responseBody) {
responseList.add(FeedDataMapper.transform(apiFeedResponse));
}
callback.onFeedFetched(responseList);
}
});
}
如您所见,一旦我从缓存交互器中获取团队集合,我就会调用从同一个订阅者获取提要的方法。我不喜欢这个。我希望能够做一些更好的事情,比如使用 Observable.concat(getTeamsFromCache(), getFeedFromRepository());将调用链接到订阅者内的另一个 rx.Observable 并不是一件好事。我想我的问题是,如何链接两个使用不同订阅者的 rx.Observable?
更新:
ServerSubscriber 是我实现订阅 Retrofit 服务的订阅者。它只是检查错误代码和一些东西。这里是:
https://gist.github.com/4gus71n/65dc94de4ca01fb221a079b68c0570b5
默认订阅者是一个空的默认订阅者。这里是:
https://gist.github.com/4gus71n/df501928fc5d24c2c6ed7740a6520330
TeamCache#getAllTeams() returns rx.Observable>
FeedRepository#getFeed(int page, int offset) returns rx.Observable>
更新 2:
这是获取用户提要的交互器现在的样子:
@Override
public void execute(final Callback callback, int offset, int pageSize) {
User user = mGetLoggedUser.get();
String userSipid = mUserSipid.get();
mFeedRepository.getFeed(offset, pageSize) //Get items from the server-side
.onErrorResumeNext(mFeedCache.getFeed(userSipid)) //If something goes wrong take it from cache
.mergeWith(mPendingPostCache.getAllPendingPostsAsFeedItems(user)) //Merge the response with the pending posts
.subscribe(new DefaultSubscriber<List<BaseFeedItem>>() {
@Override
public void onNext(List<BaseFeedItem> baseFeedItems) {
callback.onFeedFetched(baseFeedItems);
}
@Override
public void onError(Throwable e) {
if (e instanceof ServerSideException) {
//Handle the http error
} else if (e instanceof DBException) {
//Handle the database cache error
} else {
//Handle generic error
}
}
});
}
我认为你错过了 RxJava 和反应式方法的要点,你不应该有不同的具有 OO 层次结构和回调的订阅者。
您应该构建单独的 Observables
来发出它处理的特定数据,没有 Subscriber
,然后您可以根据需要链接您 Observable
,最后,您有订阅者对链式 Observable
流预期的最终结果作出反应。
像这样(使用 lambda 来获得更精简的代码):
TeamCache mTeamCache = new TeamCache();
FeedRepository mFeedRepository = new FeedRepository();
Observable.zip(teamsObservable, feedObservable, Pair::new)
.subscribe(resultPair -> {
//Do stuff with mTeams
List<BaseFeedItem> responseList = new ArrayList();
for (ApiFeedResponse apiFeedResponse : resultPair.second) {
responseList.add(FeedDataMapper.transform(apiFeedResponse));
}
}, throwable -> {
//handle errors
}
);
我使用的是 zip
而不是 concat
,因为您似乎在这里有 2 个独立的调用,您想要等待它们完成('zip' 它们一起完成)然后采取行动,但当然,因为您已经分离了 Observables
流,您可以根据需要将它们以不同的方式链接在一起。
至于你的 ServerSubscriber
和所有响应验证逻辑,它也应该是 rxify,所以你可以沿着你的服务器 Observable
流来组合它。
类似这样的东西(为了简化而发出的一些逻辑,因为我不熟悉它......)
Observable<List<SimpleTeam>> teamsObservable = mTeamCache.getAllTeams();
Observable<List<ApiFeedResponse>> feedObservable = mFeedRepository.getFeed(0, 50)
.flatMap(apiFeedsResponse -> {
if (apiFeedsResponse.code() != 200) {
if (apiFeedsResponse.code() == 304) {
List<ApiFeedResponse> body = apiFeedsResponse.body();
return Observable.just(body);
//onNotModified(o.body());
} else {
return Observable.error(new ServerSideErrorException(apiFeedsResponse));
}
} else {
//onServerSideResponse(o.body());
return Observable.just(apiFeedsResponse.body());
}
});
我正在使用 Fernando Ceja 的简洁架构开发 Android 应用程序。我的交互者或用例之一负责获取用户的提要数据。为了获取数据,首先我必须从数据库中检索用户的团队 table,然后我必须从服务器端获取 Feed 列表。
这就是我从数据库层获取 Teams 的方式:
mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
@Override
public void onNext(List<SimpleTeam> simpleTeams) {
super.onNext(simpleTeams);
mTeams = simpleTeams;
}
});
TeamCache 基本上只是另一个交互器,负责获取数据库中的所有团队。
以下是我从服务器端获取 Feed 数据的方法:
mFeedRepository.getFeed(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
@Override
protected void onServerSideError(Throwable errorResponse) {
callback.onFeedFetchFailed(...);
}
@Override
protected void onSuccess(List<ApiFeedResponse> responseBody) {
//Do stuff with mTeams
callback.onFeedFetched(...);
}
});
我的 GetFeedInteractor class 有一个名为 execute 的方法,我在其中传递稍后在 UI 中使用的回调来处理响应。所有这一切的问题是目前我正在像这样链接响应:
@Override
public void execute(final Callback callback, String userSipId) {
mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
@Override
public void onNext(List<SimpleTeam> simpleTeams) {
super.onNext(simpleTeams);
mTeams = simpleTeams;
getFeedFromRepository(callback);
}
});
}
public void getFeedFromRepository(final Callback callback) {
mFeedRepository.getFeedRx(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
@Override
protected void onServerSideError(Throwable errorResponse) {
callback.onFeedFetchFailed("failed");
}
@Override
protected void onSuccess(List<ApiFeedResponse> responseBody) {
//Do stuff with mTeams
List<BaseFeedItem> responseList = new ArrayList();
for (ApiFeedResponse apiFeedResponse : responseBody) {
responseList.add(FeedDataMapper.transform(apiFeedResponse));
}
callback.onFeedFetched(responseList);
}
});
}
如您所见,一旦我从缓存交互器中获取团队集合,我就会调用从同一个订阅者获取提要的方法。我不喜欢这个。我希望能够做一些更好的事情,比如使用 Observable.concat(getTeamsFromCache(), getFeedFromRepository());将调用链接到订阅者内的另一个 rx.Observable 并不是一件好事。我想我的问题是,如何链接两个使用不同订阅者的 rx.Observable?
更新:
ServerSubscriber 是我实现订阅 Retrofit 服务的订阅者。它只是检查错误代码和一些东西。这里是:
https://gist.github.com/4gus71n/65dc94de4ca01fb221a079b68c0570b5
默认订阅者是一个空的默认订阅者。这里是:
https://gist.github.com/4gus71n/df501928fc5d24c2c6ed7740a6520330
TeamCache#getAllTeams() returns rx.Observable> FeedRepository#getFeed(int page, int offset) returns rx.Observable>
更新 2:
这是获取用户提要的交互器现在的样子:
@Override
public void execute(final Callback callback, int offset, int pageSize) {
User user = mGetLoggedUser.get();
String userSipid = mUserSipid.get();
mFeedRepository.getFeed(offset, pageSize) //Get items from the server-side
.onErrorResumeNext(mFeedCache.getFeed(userSipid)) //If something goes wrong take it from cache
.mergeWith(mPendingPostCache.getAllPendingPostsAsFeedItems(user)) //Merge the response with the pending posts
.subscribe(new DefaultSubscriber<List<BaseFeedItem>>() {
@Override
public void onNext(List<BaseFeedItem> baseFeedItems) {
callback.onFeedFetched(baseFeedItems);
}
@Override
public void onError(Throwable e) {
if (e instanceof ServerSideException) {
//Handle the http error
} else if (e instanceof DBException) {
//Handle the database cache error
} else {
//Handle generic error
}
}
});
}
我认为你错过了 RxJava 和反应式方法的要点,你不应该有不同的具有 OO 层次结构和回调的订阅者。
您应该构建单独的 Observables
来发出它处理的特定数据,没有 Subscriber
,然后您可以根据需要链接您 Observable
,最后,您有订阅者对链式 Observable
流预期的最终结果作出反应。
像这样(使用 lambda 来获得更精简的代码):
TeamCache mTeamCache = new TeamCache();
FeedRepository mFeedRepository = new FeedRepository();
Observable.zip(teamsObservable, feedObservable, Pair::new)
.subscribe(resultPair -> {
//Do stuff with mTeams
List<BaseFeedItem> responseList = new ArrayList();
for (ApiFeedResponse apiFeedResponse : resultPair.second) {
responseList.add(FeedDataMapper.transform(apiFeedResponse));
}
}, throwable -> {
//handle errors
}
);
我使用的是 zip
而不是 concat
,因为您似乎在这里有 2 个独立的调用,您想要等待它们完成('zip' 它们一起完成)然后采取行动,但当然,因为您已经分离了 Observables
流,您可以根据需要将它们以不同的方式链接在一起。
至于你的 ServerSubscriber
和所有响应验证逻辑,它也应该是 rxify,所以你可以沿着你的服务器 Observable
流来组合它。
类似这样的东西(为了简化而发出的一些逻辑,因为我不熟悉它......)
Observable<List<SimpleTeam>> teamsObservable = mTeamCache.getAllTeams();
Observable<List<ApiFeedResponse>> feedObservable = mFeedRepository.getFeed(0, 50)
.flatMap(apiFeedsResponse -> {
if (apiFeedsResponse.code() != 200) {
if (apiFeedsResponse.code() == 304) {
List<ApiFeedResponse> body = apiFeedsResponse.body();
return Observable.just(body);
//onNotModified(o.body());
} else {
return Observable.error(new ServerSideErrorException(apiFeedsResponse));
}
} else {
//onServerSideResponse(o.body());
return Observable.just(apiFeedsResponse.body());
}
});