RXJava2中几种方法的组合
Combination of several methods in RXJava2
事实是,我需要同时从本地数据库、服务器提取数据,同时检查与 Internet 的连接。
不检查互联网很容易。但是当我关闭移动数据时,崩溃。
我不明白如何组合并决定这样做:
private void getCategories() {
composite.add(getDataFromLocal(context)
.observeOn(AndroidSchedulers.mainThread()).flatMap(new Function<PromoFilterResponse, ObservableSource<List<FilterCategory>>>() {
@Override
public ObservableSource<List<FilterCategory>> apply(PromoFilterResponse promoFilterResponse) throws Exception {
if (promoFilterResponse != null) {
PreferencesHelper.putObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, promoFilterResponse);
return combineDuplicatedCategories(promoFilterResponse);
} else {
return Observable.empty();
}
}
})
.subscribe(new Consumer<List<FilterCategory>>() {
@Override
public void accept(List<FilterCategory> categories) throws Exception {
if (mView != null) {
mView.hideConnectingProgress();
if (categories != null && categories.size() > 0) {
mView.onCategoriesReceived(categories);
}
}
}
}));
composite.add(InternetUtil.isConnectionAvailable().subscribe(isOnline -> {
if (isOnline) {
composite.add(
getDataFromServer(context)
.flatMap(new Function<PromoFilterResponse, ObservableSource<List<FilterCategory>>>() {
@Override
public ObservableSource<List<FilterCategory>> apply(PromoFilterResponse promoFilterResponse) throws Exception {
if (promoFilterResponse != null) {
PreferencesHelper.putObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, promoFilterResponse);
return combineDuplicatedCategories(promoFilterResponse);
} else {
return Observable.empty();
}
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(categories -> {
if (mView != null) {
mView.hideConnectingProgress();
if (categories != null && categories.size() > 0) {
mView.onCategoriesReceived(categories);
} else {
mView.onCategoriesReceivingFailure(errorMessage[0]);
}
}
}, throwable -> {
if (mView != null) {
if (throwable instanceof HttpException) {
ResponseBody body = ((HttpException) throwable).response().errorBody();
if (body != null) {
errorMessage[0] = body.string();
}
}
mView.hideConnectingProgress();
mView.onCategoriesReceivingFailure(errorMessage[0]);
}
}));
} else {
mView.hideConnectingProgress();
mView.showOfflineMessage();
}
}));
}
private Single<Boolean> checkNetwork(Context context) {
return InternetUtil.isConnectionAvailable()
.subscribeOn(Schedulers.io())
.doOnSuccess(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
getDataFromServer(context);
}
});
}
private Observable<PromoFilterResponse> getDataFromServer(Context context) {
return RetrofitHelper.getApiService()
.getFilterCategories(Constants.PROMO_FILTER_CATEGORIES_URL)
.subscribeOn(Schedulers.io())
.retryWhen(BaseDataManager.isAuthException())
.publish(networkResponse -> Observable.merge(networkResponse, getDataFromLocal(context).takeUntil(networkResponse)))
.doOnNext(new Consumer<PromoFilterResponse>() {
@Override
public void accept(PromoFilterResponse promoFilterResponse) throws Exception {
PreferencesHelper.putObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, promoFilterResponse);
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
LogUtil.e("ERROR", throwable.getMessage());
}
});
}
private Observable<PromoFilterResponse> getDataFromLocal(Context context) {
PromoFilterResponse response = PreferencesHelper.getObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, PromoFilterResponse.class);
if (response != null) {
return Observable.just(response)
.subscribeOn(Schedulers.io());
} else {
return Observable.empty();
}
}
如您所见,分别连接本地数据库,同时上网查询并从服务器上传数据。
不过我觉得不太对。而且订阅者重复等等。
看了很多教程,里面有介绍本地数据库与API结合使用,但是没看到同时处理与外网的连接错误。
我想很多人都遇到过这样的问题,你是如何解决的?
我不是 android 开发人员,但在我看来方法 return 类型应该是这样的:
//just for demonstration
static boolean isOnline = false;
static class NoInternet extends RuntimeException {
}
private static Completable ensureOnline() {
if (isOnline)
return Completable.complete();
else
return Completable.error(new NoInternet());
}
private static Single<String> getDataFromServer() {
return Single.just("From server");
}
private static Maybe<String> getDataFromLocal() {
return Maybe.just("From local");//or Maybe.never()
}
我们可以 运行 与 Observable.merge
并行。但是,如果发生错误 NoIternet
怎么办?合并的可观察对象将失败。我们可以使用 materialisation
- 将所有发射和误差转换为 onNext
值。
private static void loadData() {
Observable<Notification<String>> fromServer = ensureOnline().andThen(getDataFromServer()).toObservable().materialize();
Observable<Notification<String>> fromLocaldb = getDataFromLocal().toObservable().materialize();
Observable.merge(fromLocaldb, fromServer)
.subscribe(notification -> {
if (notification.isOnNext()) {
//calls one or two times(db+server || db || server)
//show data in ui
} else if (notification.isOnError()) {
if (notification.getError() instanceof NoInternet) {
//show no internet
} else {
//show another error
}
} else if (notification.isOnComplete()){
//hide progress bar
}
});
}
假设你有两个 Obsevable:一个来自服务器,另一个来自数据库
您可以将它们合并为一个流,如下所示:
public Observable<Joke> getAllJokes() {
Observable<Joke> remote = mRepository.getAllJokes()
.subscribeOn(Schedulers.io());
Observable<Joke> local = mRepository.getAllJokes().subscribeOn(Schedulers.io());
return Observable.mergeDelayError(local, remote).filter(joke -> joke != null);
}
事实是,我需要同时从本地数据库、服务器提取数据,同时检查与 Internet 的连接。
不检查互联网很容易。但是当我关闭移动数据时,崩溃。
我不明白如何组合并决定这样做:
private void getCategories() {
composite.add(getDataFromLocal(context)
.observeOn(AndroidSchedulers.mainThread()).flatMap(new Function<PromoFilterResponse, ObservableSource<List<FilterCategory>>>() {
@Override
public ObservableSource<List<FilterCategory>> apply(PromoFilterResponse promoFilterResponse) throws Exception {
if (promoFilterResponse != null) {
PreferencesHelper.putObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, promoFilterResponse);
return combineDuplicatedCategories(promoFilterResponse);
} else {
return Observable.empty();
}
}
})
.subscribe(new Consumer<List<FilterCategory>>() {
@Override
public void accept(List<FilterCategory> categories) throws Exception {
if (mView != null) {
mView.hideConnectingProgress();
if (categories != null && categories.size() > 0) {
mView.onCategoriesReceived(categories);
}
}
}
}));
composite.add(InternetUtil.isConnectionAvailable().subscribe(isOnline -> {
if (isOnline) {
composite.add(
getDataFromServer(context)
.flatMap(new Function<PromoFilterResponse, ObservableSource<List<FilterCategory>>>() {
@Override
public ObservableSource<List<FilterCategory>> apply(PromoFilterResponse promoFilterResponse) throws Exception {
if (promoFilterResponse != null) {
PreferencesHelper.putObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, promoFilterResponse);
return combineDuplicatedCategories(promoFilterResponse);
} else {
return Observable.empty();
}
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(categories -> {
if (mView != null) {
mView.hideConnectingProgress();
if (categories != null && categories.size() > 0) {
mView.onCategoriesReceived(categories);
} else {
mView.onCategoriesReceivingFailure(errorMessage[0]);
}
}
}, throwable -> {
if (mView != null) {
if (throwable instanceof HttpException) {
ResponseBody body = ((HttpException) throwable).response().errorBody();
if (body != null) {
errorMessage[0] = body.string();
}
}
mView.hideConnectingProgress();
mView.onCategoriesReceivingFailure(errorMessage[0]);
}
}));
} else {
mView.hideConnectingProgress();
mView.showOfflineMessage();
}
}));
}
private Single<Boolean> checkNetwork(Context context) {
return InternetUtil.isConnectionAvailable()
.subscribeOn(Schedulers.io())
.doOnSuccess(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
getDataFromServer(context);
}
});
}
private Observable<PromoFilterResponse> getDataFromServer(Context context) {
return RetrofitHelper.getApiService()
.getFilterCategories(Constants.PROMO_FILTER_CATEGORIES_URL)
.subscribeOn(Schedulers.io())
.retryWhen(BaseDataManager.isAuthException())
.publish(networkResponse -> Observable.merge(networkResponse, getDataFromLocal(context).takeUntil(networkResponse)))
.doOnNext(new Consumer<PromoFilterResponse>() {
@Override
public void accept(PromoFilterResponse promoFilterResponse) throws Exception {
PreferencesHelper.putObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, promoFilterResponse);
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
LogUtil.e("ERROR", throwable.getMessage());
}
});
}
private Observable<PromoFilterResponse> getDataFromLocal(Context context) {
PromoFilterResponse response = PreferencesHelper.getObject(context, PreferencesKey.FILTER_CATEGORIES_KEY, PromoFilterResponse.class);
if (response != null) {
return Observable.just(response)
.subscribeOn(Schedulers.io());
} else {
return Observable.empty();
}
}
如您所见,分别连接本地数据库,同时上网查询并从服务器上传数据。
不过我觉得不太对。而且订阅者重复等等。
看了很多教程,里面有介绍本地数据库与API结合使用,但是没看到同时处理与外网的连接错误。
我想很多人都遇到过这样的问题,你是如何解决的?
我不是 android 开发人员,但在我看来方法 return 类型应该是这样的:
//just for demonstration
static boolean isOnline = false;
static class NoInternet extends RuntimeException {
}
private static Completable ensureOnline() {
if (isOnline)
return Completable.complete();
else
return Completable.error(new NoInternet());
}
private static Single<String> getDataFromServer() {
return Single.just("From server");
}
private static Maybe<String> getDataFromLocal() {
return Maybe.just("From local");//or Maybe.never()
}
我们可以 运行 与 Observable.merge
并行。但是,如果发生错误 NoIternet
怎么办?合并的可观察对象将失败。我们可以使用 materialisation
- 将所有发射和误差转换为 onNext
值。
private static void loadData() {
Observable<Notification<String>> fromServer = ensureOnline().andThen(getDataFromServer()).toObservable().materialize();
Observable<Notification<String>> fromLocaldb = getDataFromLocal().toObservable().materialize();
Observable.merge(fromLocaldb, fromServer)
.subscribe(notification -> {
if (notification.isOnNext()) {
//calls one or two times(db+server || db || server)
//show data in ui
} else if (notification.isOnError()) {
if (notification.getError() instanceof NoInternet) {
//show no internet
} else {
//show another error
}
} else if (notification.isOnComplete()){
//hide progress bar
}
});
}
假设你有两个 Obsevable:一个来自服务器,另一个来自数据库
您可以将它们合并为一个流,如下所示:
public Observable<Joke> getAllJokes() {
Observable<Joke> remote = mRepository.getAllJokes()
.subscribeOn(Schedulers.io());
Observable<Joke> local = mRepository.getAllJokes().subscribeOn(Schedulers.io());
return Observable.mergeDelayError(local, remote).filter(joke -> joke != null);
}