如何使用 RxJava Observer 而不是事件总线来处理 401、403、503,500 等 HTTP 错误
How do I handle HTTP errors like 401, 403, 503,500 using RxJava Observer instead of Event Bus
我正在使用 Retrofit
、OK-HTTP
和 RxJava2
来处理网络调用,我创建了下面的拦截器来处理每个网络调用的网络错误响应,有没有更好的处理这个的方法?
EventBus 是这样吗?
我不想在每个方法中检查这个错误异常,
//HTTP 客户端
OkHttpClient tempClient = new OkHttpClient.Builder()
.readTimeout(CONNECT_TIMEOUT_IN_SEC, TimeUnit.SECONDS)// connect timeout
.connectTimeout(CONNECT_TIMEOUT_IN_SEC, TimeUnit.SECONDS)// socket timeout
.followRedirects(false)
.cache(provideHttpCache())
.addNetworkInterceptor(new ResponseCodeCheckInterceptor())
.addNetworkInterceptor(new ResponseCacheInterceptor())
.addInterceptor(new AddHeaderAndCookieInterceptor())
.build();
HTTP 客户端拦截器
public class ResponseCodeCheckInterceptor implements Interceptor {
private static final String TAG = "RespCacheInterceptor";
@Override
public Response intercept(Chain chain) throws IOException {
Response response = chain.proceed(chain.request());
Request originalRequest = chain.request();
if (response.code() == HttpStatus.UNAUTHORIZED.value()) {
throw new UnAuthorizedException();
}else if (response.code() == HttpStatus.INTERNAL_SERVER_ERROR.value()) {
throw new APIException(response.code(), "Server Internal Error");
} else if (response.code() == HttpStatus.SERVICE_UNAVAILABLE.value()) {
throw new ServiceUnavailableException();
} else {
throw new APIException(code,response.body().toString());
}
return response;
}
}
API Class
@GET("customer/account/")
Single<Customer> getCustomer();
......
存储库Class
@Override
public Single<Customer> getCustomer() {
return this.mCustomerRemoteDataStore.getCustomer()
.doOnSuccess(new Consumer<Customer>() {
@Override
public void accept(Customer customer) throws Exception {
if (customer != null) {
mCustomerLocalDataStore.saveCustomer(customer);
}
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
}
主持人Class
@Override
public void getCustomerFullDetails() {
checkViewAttached();
getView().showLoading();
addSubscription(customerRepository.getCustomerFullDetails(true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSingleObserver<CustomerDetails>() {
@Override
public void onSuccess(@io.reactivex.annotations.NonNull CustomerDetails customerDetails) {
getView().onCustomerDetailsSuccess();
}
@Override
public void onError(@io.reactivex.annotations.NonNull Throwable throwable) {
Log.d(TAG, "error: " + throwable.getLocalizedMessage());
if (throwable instanceof UnAuthorizedException) {
getView().showLoginPage();
else if (throwable instanceof ServiceUnavailableException) {
getView().showServiceUnAvaiableMsg();
}...
}
})
);
}
更新代码
============
public class CheckConnectivityInterceptor implements Interceptor {
private static final String TAG = CheckConnectivityInterceptor.class.getSimpleName() ;
private boolean isNetworkActive;
private RxEventBus eventBus;
private Context mContext;
public CheckConnectivityInterceptor(RxEventBus eventBus, Context mContext) {
this.mContext = mContext;
this.eventBus = eventBus;
}
@Override
public Response intercept(Interceptor.Chain chain) throws IOException {
Request originalRequest = chain.request();
String requestPath = originalRequest.url().url().getPath();
if (!NetworkUtil.isConnected(this.mContext)) {
eventBus.send(new ErrorState(ErrorType.NO_INTERNET_CONNECTION,
this.mContext.getString(R.string.no_network_connection), requestPath));
//Added this exception so it's not trying to execute the chain
throw new NoConnectivityException();
} else {
Response originalResponse = null;
try {
originalResponse = chain.proceed(chain.request());
} catch (Exception ex) {
eventBus.send(new ErrorState(ErrorType.SERVICE_ERROR, this.mContext.getString(R.string.connection_failed), requestPath));
Log.e(TAG, "check connectivity intercept: ",ex );
throw new IOException("IO Exception occurred");
}
return originalResponse;
}
}
}
====================
public class HTTPResponseCodeCheckInterceptor implements Interceptor {
private RxEventBus eventBus;
public HTTPResponseCodeCheckInterceptor(RxEventBus eventBus) {
this.eventBus = eventBus;
}
@Override
public Response intercept(Chain chain) throws IOException {
if (!responseSuccess) {
if (code == HttpStatus.MOVED_TEMPORARILY.value()) {
eventBus.send(new ErrorState(ErrorType.STEP_UP_AUTHENTICATION,requestPath,rSecureCode));
} else if (code == HttpStatus.INTERNAL_SERVER_ERROR.value()) { // Error code 500
eventBus.send(new ErrorState(ErrorType.SERVICE_ERROR, getAPIError(responseStringOrig),requestPath));
} else if (code == HttpStatus.SERVICE_UNAVAILABLE.value()) {
eventBus.send(new ErrorState(ErrorType.SERVICE_UNAVAILABLE, getOutageMessage(responseStringOrig),requestPath));
} else {
eventBus.send(new ErrorState(ErrorType.SERVICE_ERROR,new APIErrorResponse(500, "Internal Server Error"),requestPath));
}
}
}
}
===================
public class RxEventBus {
private PublishSubject<ErrorState> bus = PublishSubject.create();
private RxEventBus() {
}
private static class SingletonHolder {
private static final RxEventBus INSTANCE = new RxEventBus();
}
public static RxEventBus getBus() {
return RxEventBus.SingletonHolder.INSTANCE;
}
public void send(ErrorState o) {
bus.onNext(o);
}
public Observable<ErrorState> toObserverable() {
return bus;
}
public boolean hasObservers() {
return bus.hasObservers();
}
public static void register(Object subscriber) {
//bus.register(subscriber);
}
public static void unregister(Object subscriber) {
// bus.unregister(subscriber);
}
}
=====================
public class BasePresenter<V extends MVPView> implements MVPPresenter<V> {
private final CompositeDisposable mCompositeDisposable;
@Override
public void subscribe() {
initRxBus();
}
@Override
public void unsubscribe() {
RxUtil.unsubscribe(mCompositeDisposable);
}
public void addSubscription(Disposable disposable){
if(mCompositeDisposable != null){
mCompositeDisposable.add(disposable);
Log.d(TAG, "addSubscription: "+mCompositeDisposable.size());
}
}
private void initRxBus() {
addSubscription(EPGApplication.getAppInstance().eventBus()
.toObserverable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ErrorState>() {
@Override
public void accept(ErrorState errorState) throws Exception {
if (mMvpView != null) {
mMvpView.hideLoadingIndicator();
if (ErrorType.STEP_UP_AUTHENTICATION == errorState.getType()) {
mMvpView.showStepUpAuthentication(errorState.getSecureRequestCode());
} else if (ErrorType.SERVICE_ERROR == errorState.getType()) {
mMvpView.showServiceError(((APIErrorResponse) errorState.getErrorData()).getErrorMessage());
} else if (ErrorType.SERVICE_UNAVAILABLE == errorState.getType()) {
mMvpView.showServiceUnavailable(((OutageBody) errorState.getErrorData()));
} else if (ErrorType.UNAUTHORIZED == errorState.getType()) {
mMvpView.sessionTokenExpiredRequestLogin();
} else if (ErrorType.GEO_BLOCK == errorState.getType()) {
mMvpView.showGeoBlockErrorMessage();
} else if (ErrorType.SESSION_EXPIRED == errorState.getType()) {
mMvpView.sessionTokenExpiredRequestLogin();
}else if (ErrorType.NO_INTERNET_CONNECTION == errorState.getType()) {
mMvpView.showNoNetworkConnectivityMessage();
mMvpView.showServiceError(resourceProvider.getString(R.string.no_network_connection));
}
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "base excpetion: ", throwable);
}
}));
}
}
}
==================
public class ProfilePresenter<V extends ProfileContract.View> extends BasePresenter<V>
implements ProfileContract.Presenter<V> {
public ProfilePresenter(ProfileContract.View view, CustomerRepository repository) {
super();
this.repository = repository;
}
private void updateCustomerAccountDetails(JSONObject payload) {
getMvpView().showLoadingIndicator();
addSubscription(repository.updateCustomerDetails(sharedPreferencesRepository.isStepUpAuthRequired(), AppConfig.CUSTOMER_ACCOUNT_HOLDER_UPDATE
, payload)
.compose(RxUtil.applySingleSchedulers())
.subscribeWith(new DisposableSingleObserver<BaseServerResponse>() {
@Override
public void onSuccess(BaseServerResponse response) {
if (!isViewAttached()) {
return;
}
getMvpView().hideLoadingIndicator();
getMvpView().onSuccessProfileInfoUpdate();
}
@Override
public void onError(Throwable throwable) {
if (!isViewAttached()) {
return;
}
if (throwable instanceof NoConnectivityException) {
getMvpView().showNoNetworkConnectivityMessage();
}
getMvpView().hideLoadingIndicator();
}
}));
}
}
我过去使用的一种方法是共享一个 Subject
用于传达错误情况。
在生产者方面,持有对它的引用 Subject
类型(Publish
/Behavior
/Replay
/等)以便使用应用程序发生的下一个错误调用 onNext()
。在这种情况下,生产者将是 ResponseCodeCheckInterceptor
实例。这个实例不会抛出各种异常,而是发出一个 ErrorState
来描述刚刚发生的错误。 (假设 ErrorState
是一个自定义类型,它携带了关于错误条件的足够信息,供消费者决定如何做出反应,例如更新 UI、clean-up 资源等)。
在消费者端,将对共享 Subject
的引用保存为 Observable<ErrorState>
。正如您似乎在做 MVP,演示者可能是您的消费者之一。
(依赖注入是共享 Subject
实例的好方法)。
希望对您有所帮助!
更新一些粗略的示例代码...
// this is descriptive way to identify unique error types you may care about
enum ErrorType {
Unauthorized,
ServiceUnavailable,
ServiceError
}
// this class should bundle together enough information about an
// error that has occurred so consumers can decide how to respond
// e.g. is this a "fatal" error, or can the associated operation be
// retried?
class ErrorState {
public final ErrorType type;
public final boolean fatal;
ErrorState(ErrorType type, boolean fatal) {
this.type = type;
this.fatal = fatal;
}
}
// the Interceptor creates new instances of ErrorState and pushes
// them through the Subject to notify downstream subscribers
class ResponseCodeCheckInterceptor implements Interceptor {
private final Subject<ErrorState> errorStateSubject;
ResponseCodeCheckInterceptor(Subject<ErrorState> errorStateSubject) {
this.errorStateSubject = errorStateSubject;
}
@Override
public Response intercept(@NonNull Chain chain) throws IOException {
final Response response = chain.proceed(chain.request());
if(response.code() == HttpStatus.UNAUTHORIZED.value()) {
errorStateSubject.onNext(new ErrorState(ErrorType.Unauthorized, false));
} else if (response.code() == HttpStatus.INTERNAL_SERVER_ERROR.value()) {
errorStateSubject.onNext(new ErrorState(ErrorType.ServiceError, true));
} else if (response.code() == HttpStatus.SERVICE_UNAVAILABLE.value()) {
errorStateSubject.onNext(new ErrorState(ErrorType.ServiceUnavailable, false));
}
return response;
}
}
// the Presenter is created with a reference to the same Subject, but
// retains a reference to it as an Observable. the Consumer instance
// supplied as the onNext() handler is where you'd put your logic to
// handle ErrorStates. also, the Presenter should be lifecycle-aware
// so as to create and dispose of subscriptions at the appropriate
// times.
class Presenter {
private final Observable<ErrorState> errorStateStream;
private Disposable errorStateSubscription;
class Presenter(Observable<ErrorState> errorStateStream) {
this.errorStateStream = errorStateStream
}
public void onStart() {
errorStateSubscription = errorStateStream.subscribe(
next -> {
/* Invoke views/etc */
},
error -> {
/* handle stream error during */
}
);
}
public void onStop() {
if(errorStateSubscription != null) {
errorStateSubscription.dispose();
}
}
}
Observable.just(new Object())
.subscribeOn(Schedulers.io())
.subscribe(new DefaultObserver<Object>() {
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
if(e instanceof HttpException) {
HttpException httpException = (HttpException) e;
if(httpException.code() == 400)
Log.d(TAG, "onError: BAD REQUEST");
else if(httpException.code() == 401)
Log.d(TAG, "onError: NOT AUTHORIZED");
else if(httpException.code() == 403)
Log.d(TAG, "onError: FORBIDDEN");
else if(httpException.code() == 404)
Log.d(TAG, "onError: NOT FOUND");
else if(httpException.code() == 500)
Log.d(TAG, "onError: INTERNAL SERVER ERROR");
else if(httpException.code() == 502)
Log.d(TAG, "onError: BAD GATEWAY");
}
}
@Override
public void onComplete() {
}
});
请注意,所有代码为 2xx 的回复都将在 onNext
中使用,代码以 4xx 开头5xx 将在 onError
.
中消耗
仅供参考。在这种情况下不需要 ResponseCodeCheckInterceptor
。试试不使用自定义拦截器,这应该可以解决问题。
更新
自定义观察者
public abstract class CustomObserver extends DefaultObserver implements Observer{
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
if(e instanceof HttpException) {
HttpException httpException = (HttpException) e;
if(httpException.code() == 400)
onBadRequest(e);
else if(httpException.code() == 401)
onNotAuthorized(e);
else if(httpException.code() == 502)
onBadGateway(e);
}
}
public abstract void onNotAuthorized(Throwable e);
public abstract void onBadGateway(Throwable e);
public abstract void onBadRequest(Throwable e);
@Override
public void onComplete() {
}
}
实施
Observable.just(new Object())
.subscribeOn(Schedulers.io())
.subscribe(new CustomObserver() {
@Override
public void onNext(Object o) {
super.onNext(o);
}
@Override
public void onError(Throwable e) {
super.onError(e);
}
@Override
public void onNotAuthorized(Throwable e) {
}
@Override
public void onBadGateway(Throwable e) {
}
@Override
public void onBadRequest(Throwable e) {
}
@Override
public void onComplete() {
super.onComplete();
}
});
Kotlin 方式
如果你使用 Kotlin 和 MVVM,我建议另一种简单的方法
您可以像下面这样使用 Kotlin 扩展:
fun <T> Single<T>.handelNetworkError() =
onErrorResumeNext { e ->
when (e) {
is OfflineException -> return@onErrorResumeNext Single.error(Exception("check your internet connection "))
is SocketTimeoutException -> return@onErrorResumeNext Single.error(Exception("server not fount"))
is retrofit2.HttpException -> {
val responseBody = e.response().errorBody()
///if you want more custom error:
/* when(e.response().code()){
500 -> return@onErrorResumeNext Single.error( customException500(responseBody?.run { getErrorMessage(responseBody) }))
402 -> return@onErrorResumeNext Single.error(customException402(responseBody?.run { getErrorMessage(responseBody) }))
.... -> } */
return@onErrorResumeNext Single.error(Exception(responseBody?.run { getErrorMessage(responseBody) }))
}
is IOException -> return@onErrorResumeNext Single.error(Exception("Network error"))
else -> return@onErrorResumeNext Single.error(e)
}
}
fun <T> Observable<T>.handelNetworkError() =
onErrorResumeNext { e : Throwable ->
when (e) {
is OfflineException -> return@onErrorResumeNext Observable.error(Exception("check your internet connection"))
is SocketTimeoutException -> return@onErrorResumeNext Observable.error(Exception("server not fount"))
is retrofit2.HttpException -> {
val responseBody = e.response().errorBody()
return@onErrorResumeNext Observable.error(Exception(responseBody?.run { getErrorMessage(responseBody) }))
}
is IOException -> return@onErrorResumeNext Observable.error(Exception("Network error"))
else -> return@onErrorResumeNext Observable.error(e)
}
}
fun getErrorMessage(responseBody: ResponseBody): String? {
return try {
val jsonObject = JSONObject(responseBody.string())
jsonObject.getString("message")
} catch (e: Exception) {
e.message
}
}
class OfflineException : IOException() {
override val message: String?
get() = "no internet!"
}
用法:
api.getUserList().handelNetworkError().subscribe { }
我正在使用 Retrofit
、OK-HTTP
和 RxJava2
来处理网络调用,我创建了下面的拦截器来处理每个网络调用的网络错误响应,有没有更好的处理这个的方法?
EventBus 是这样吗?
我不想在每个方法中检查这个错误异常,
//HTTP 客户端
OkHttpClient tempClient = new OkHttpClient.Builder()
.readTimeout(CONNECT_TIMEOUT_IN_SEC, TimeUnit.SECONDS)// connect timeout
.connectTimeout(CONNECT_TIMEOUT_IN_SEC, TimeUnit.SECONDS)// socket timeout
.followRedirects(false)
.cache(provideHttpCache())
.addNetworkInterceptor(new ResponseCodeCheckInterceptor())
.addNetworkInterceptor(new ResponseCacheInterceptor())
.addInterceptor(new AddHeaderAndCookieInterceptor())
.build();
HTTP 客户端拦截器
public class ResponseCodeCheckInterceptor implements Interceptor {
private static final String TAG = "RespCacheInterceptor";
@Override
public Response intercept(Chain chain) throws IOException {
Response response = chain.proceed(chain.request());
Request originalRequest = chain.request();
if (response.code() == HttpStatus.UNAUTHORIZED.value()) {
throw new UnAuthorizedException();
}else if (response.code() == HttpStatus.INTERNAL_SERVER_ERROR.value()) {
throw new APIException(response.code(), "Server Internal Error");
} else if (response.code() == HttpStatus.SERVICE_UNAVAILABLE.value()) {
throw new ServiceUnavailableException();
} else {
throw new APIException(code,response.body().toString());
}
return response;
}
}
API Class
@GET("customer/account/")
Single<Customer> getCustomer();
......
存储库Class
@Override
public Single<Customer> getCustomer() {
return this.mCustomerRemoteDataStore.getCustomer()
.doOnSuccess(new Consumer<Customer>() {
@Override
public void accept(Customer customer) throws Exception {
if (customer != null) {
mCustomerLocalDataStore.saveCustomer(customer);
}
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
}
主持人Class
@Override
public void getCustomerFullDetails() {
checkViewAttached();
getView().showLoading();
addSubscription(customerRepository.getCustomerFullDetails(true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSingleObserver<CustomerDetails>() {
@Override
public void onSuccess(@io.reactivex.annotations.NonNull CustomerDetails customerDetails) {
getView().onCustomerDetailsSuccess();
}
@Override
public void onError(@io.reactivex.annotations.NonNull Throwable throwable) {
Log.d(TAG, "error: " + throwable.getLocalizedMessage());
if (throwable instanceof UnAuthorizedException) {
getView().showLoginPage();
else if (throwable instanceof ServiceUnavailableException) {
getView().showServiceUnAvaiableMsg();
}...
}
})
);
}
更新代码 ============
public class CheckConnectivityInterceptor implements Interceptor {
private static final String TAG = CheckConnectivityInterceptor.class.getSimpleName() ;
private boolean isNetworkActive;
private RxEventBus eventBus;
private Context mContext;
public CheckConnectivityInterceptor(RxEventBus eventBus, Context mContext) {
this.mContext = mContext;
this.eventBus = eventBus;
}
@Override
public Response intercept(Interceptor.Chain chain) throws IOException {
Request originalRequest = chain.request();
String requestPath = originalRequest.url().url().getPath();
if (!NetworkUtil.isConnected(this.mContext)) {
eventBus.send(new ErrorState(ErrorType.NO_INTERNET_CONNECTION,
this.mContext.getString(R.string.no_network_connection), requestPath));
//Added this exception so it's not trying to execute the chain
throw new NoConnectivityException();
} else {
Response originalResponse = null;
try {
originalResponse = chain.proceed(chain.request());
} catch (Exception ex) {
eventBus.send(new ErrorState(ErrorType.SERVICE_ERROR, this.mContext.getString(R.string.connection_failed), requestPath));
Log.e(TAG, "check connectivity intercept: ",ex );
throw new IOException("IO Exception occurred");
}
return originalResponse;
}
}
}
====================
public class HTTPResponseCodeCheckInterceptor implements Interceptor {
private RxEventBus eventBus;
public HTTPResponseCodeCheckInterceptor(RxEventBus eventBus) {
this.eventBus = eventBus;
}
@Override
public Response intercept(Chain chain) throws IOException {
if (!responseSuccess) {
if (code == HttpStatus.MOVED_TEMPORARILY.value()) {
eventBus.send(new ErrorState(ErrorType.STEP_UP_AUTHENTICATION,requestPath,rSecureCode));
} else if (code == HttpStatus.INTERNAL_SERVER_ERROR.value()) { // Error code 500
eventBus.send(new ErrorState(ErrorType.SERVICE_ERROR, getAPIError(responseStringOrig),requestPath));
} else if (code == HttpStatus.SERVICE_UNAVAILABLE.value()) {
eventBus.send(new ErrorState(ErrorType.SERVICE_UNAVAILABLE, getOutageMessage(responseStringOrig),requestPath));
} else {
eventBus.send(new ErrorState(ErrorType.SERVICE_ERROR,new APIErrorResponse(500, "Internal Server Error"),requestPath));
}
}
}
}
===================
public class RxEventBus {
private PublishSubject<ErrorState> bus = PublishSubject.create();
private RxEventBus() {
}
private static class SingletonHolder {
private static final RxEventBus INSTANCE = new RxEventBus();
}
public static RxEventBus getBus() {
return RxEventBus.SingletonHolder.INSTANCE;
}
public void send(ErrorState o) {
bus.onNext(o);
}
public Observable<ErrorState> toObserverable() {
return bus;
}
public boolean hasObservers() {
return bus.hasObservers();
}
public static void register(Object subscriber) {
//bus.register(subscriber);
}
public static void unregister(Object subscriber) {
// bus.unregister(subscriber);
}
}
=====================
public class BasePresenter<V extends MVPView> implements MVPPresenter<V> {
private final CompositeDisposable mCompositeDisposable;
@Override
public void subscribe() {
initRxBus();
}
@Override
public void unsubscribe() {
RxUtil.unsubscribe(mCompositeDisposable);
}
public void addSubscription(Disposable disposable){
if(mCompositeDisposable != null){
mCompositeDisposable.add(disposable);
Log.d(TAG, "addSubscription: "+mCompositeDisposable.size());
}
}
private void initRxBus() {
addSubscription(EPGApplication.getAppInstance().eventBus()
.toObserverable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ErrorState>() {
@Override
public void accept(ErrorState errorState) throws Exception {
if (mMvpView != null) {
mMvpView.hideLoadingIndicator();
if (ErrorType.STEP_UP_AUTHENTICATION == errorState.getType()) {
mMvpView.showStepUpAuthentication(errorState.getSecureRequestCode());
} else if (ErrorType.SERVICE_ERROR == errorState.getType()) {
mMvpView.showServiceError(((APIErrorResponse) errorState.getErrorData()).getErrorMessage());
} else if (ErrorType.SERVICE_UNAVAILABLE == errorState.getType()) {
mMvpView.showServiceUnavailable(((OutageBody) errorState.getErrorData()));
} else if (ErrorType.UNAUTHORIZED == errorState.getType()) {
mMvpView.sessionTokenExpiredRequestLogin();
} else if (ErrorType.GEO_BLOCK == errorState.getType()) {
mMvpView.showGeoBlockErrorMessage();
} else if (ErrorType.SESSION_EXPIRED == errorState.getType()) {
mMvpView.sessionTokenExpiredRequestLogin();
}else if (ErrorType.NO_INTERNET_CONNECTION == errorState.getType()) {
mMvpView.showNoNetworkConnectivityMessage();
mMvpView.showServiceError(resourceProvider.getString(R.string.no_network_connection));
}
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "base excpetion: ", throwable);
}
}));
}
}
}
==================
public class ProfilePresenter<V extends ProfileContract.View> extends BasePresenter<V>
implements ProfileContract.Presenter<V> {
public ProfilePresenter(ProfileContract.View view, CustomerRepository repository) {
super();
this.repository = repository;
}
private void updateCustomerAccountDetails(JSONObject payload) {
getMvpView().showLoadingIndicator();
addSubscription(repository.updateCustomerDetails(sharedPreferencesRepository.isStepUpAuthRequired(), AppConfig.CUSTOMER_ACCOUNT_HOLDER_UPDATE
, payload)
.compose(RxUtil.applySingleSchedulers())
.subscribeWith(new DisposableSingleObserver<BaseServerResponse>() {
@Override
public void onSuccess(BaseServerResponse response) {
if (!isViewAttached()) {
return;
}
getMvpView().hideLoadingIndicator();
getMvpView().onSuccessProfileInfoUpdate();
}
@Override
public void onError(Throwable throwable) {
if (!isViewAttached()) {
return;
}
if (throwable instanceof NoConnectivityException) {
getMvpView().showNoNetworkConnectivityMessage();
}
getMvpView().hideLoadingIndicator();
}
}));
}
}
我过去使用的一种方法是共享一个 Subject
用于传达错误情况。
在生产者方面,持有对它的引用
Subject
类型(Publish
/Behavior
/Replay
/等)以便使用应用程序发生的下一个错误调用onNext()
。在这种情况下,生产者将是ResponseCodeCheckInterceptor
实例。这个实例不会抛出各种异常,而是发出一个ErrorState
来描述刚刚发生的错误。 (假设ErrorState
是一个自定义类型,它携带了关于错误条件的足够信息,供消费者决定如何做出反应,例如更新 UI、clean-up 资源等)。在消费者端,将对共享
Subject
的引用保存为Observable<ErrorState>
。正如您似乎在做 MVP,演示者可能是您的消费者之一。
(依赖注入是共享 Subject
实例的好方法)。
希望对您有所帮助!
更新一些粗略的示例代码...
// this is descriptive way to identify unique error types you may care about
enum ErrorType {
Unauthorized,
ServiceUnavailable,
ServiceError
}
// this class should bundle together enough information about an
// error that has occurred so consumers can decide how to respond
// e.g. is this a "fatal" error, or can the associated operation be
// retried?
class ErrorState {
public final ErrorType type;
public final boolean fatal;
ErrorState(ErrorType type, boolean fatal) {
this.type = type;
this.fatal = fatal;
}
}
// the Interceptor creates new instances of ErrorState and pushes
// them through the Subject to notify downstream subscribers
class ResponseCodeCheckInterceptor implements Interceptor {
private final Subject<ErrorState> errorStateSubject;
ResponseCodeCheckInterceptor(Subject<ErrorState> errorStateSubject) {
this.errorStateSubject = errorStateSubject;
}
@Override
public Response intercept(@NonNull Chain chain) throws IOException {
final Response response = chain.proceed(chain.request());
if(response.code() == HttpStatus.UNAUTHORIZED.value()) {
errorStateSubject.onNext(new ErrorState(ErrorType.Unauthorized, false));
} else if (response.code() == HttpStatus.INTERNAL_SERVER_ERROR.value()) {
errorStateSubject.onNext(new ErrorState(ErrorType.ServiceError, true));
} else if (response.code() == HttpStatus.SERVICE_UNAVAILABLE.value()) {
errorStateSubject.onNext(new ErrorState(ErrorType.ServiceUnavailable, false));
}
return response;
}
}
// the Presenter is created with a reference to the same Subject, but
// retains a reference to it as an Observable. the Consumer instance
// supplied as the onNext() handler is where you'd put your logic to
// handle ErrorStates. also, the Presenter should be lifecycle-aware
// so as to create and dispose of subscriptions at the appropriate
// times.
class Presenter {
private final Observable<ErrorState> errorStateStream;
private Disposable errorStateSubscription;
class Presenter(Observable<ErrorState> errorStateStream) {
this.errorStateStream = errorStateStream
}
public void onStart() {
errorStateSubscription = errorStateStream.subscribe(
next -> {
/* Invoke views/etc */
},
error -> {
/* handle stream error during */
}
);
}
public void onStop() {
if(errorStateSubscription != null) {
errorStateSubscription.dispose();
}
}
}
Observable.just(new Object())
.subscribeOn(Schedulers.io())
.subscribe(new DefaultObserver<Object>() {
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
if(e instanceof HttpException) {
HttpException httpException = (HttpException) e;
if(httpException.code() == 400)
Log.d(TAG, "onError: BAD REQUEST");
else if(httpException.code() == 401)
Log.d(TAG, "onError: NOT AUTHORIZED");
else if(httpException.code() == 403)
Log.d(TAG, "onError: FORBIDDEN");
else if(httpException.code() == 404)
Log.d(TAG, "onError: NOT FOUND");
else if(httpException.code() == 500)
Log.d(TAG, "onError: INTERNAL SERVER ERROR");
else if(httpException.code() == 502)
Log.d(TAG, "onError: BAD GATEWAY");
}
}
@Override
public void onComplete() {
}
});
请注意,所有代码为 2xx 的回复都将在 onNext
中使用,代码以 4xx 开头5xx 将在 onError
.
仅供参考。在这种情况下不需要 ResponseCodeCheckInterceptor
。试试不使用自定义拦截器,这应该可以解决问题。
更新
自定义观察者
public abstract class CustomObserver extends DefaultObserver implements Observer{
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
if(e instanceof HttpException) {
HttpException httpException = (HttpException) e;
if(httpException.code() == 400)
onBadRequest(e);
else if(httpException.code() == 401)
onNotAuthorized(e);
else if(httpException.code() == 502)
onBadGateway(e);
}
}
public abstract void onNotAuthorized(Throwable e);
public abstract void onBadGateway(Throwable e);
public abstract void onBadRequest(Throwable e);
@Override
public void onComplete() {
}
}
实施
Observable.just(new Object())
.subscribeOn(Schedulers.io())
.subscribe(new CustomObserver() {
@Override
public void onNext(Object o) {
super.onNext(o);
}
@Override
public void onError(Throwable e) {
super.onError(e);
}
@Override
public void onNotAuthorized(Throwable e) {
}
@Override
public void onBadGateway(Throwable e) {
}
@Override
public void onBadRequest(Throwable e) {
}
@Override
public void onComplete() {
super.onComplete();
}
});
Kotlin 方式
如果你使用 Kotlin 和 MVVM,我建议另一种简单的方法
您可以像下面这样使用 Kotlin 扩展:
fun <T> Single<T>.handelNetworkError() =
onErrorResumeNext { e ->
when (e) {
is OfflineException -> return@onErrorResumeNext Single.error(Exception("check your internet connection "))
is SocketTimeoutException -> return@onErrorResumeNext Single.error(Exception("server not fount"))
is retrofit2.HttpException -> {
val responseBody = e.response().errorBody()
///if you want more custom error:
/* when(e.response().code()){
500 -> return@onErrorResumeNext Single.error( customException500(responseBody?.run { getErrorMessage(responseBody) }))
402 -> return@onErrorResumeNext Single.error(customException402(responseBody?.run { getErrorMessage(responseBody) }))
.... -> } */
return@onErrorResumeNext Single.error(Exception(responseBody?.run { getErrorMessage(responseBody) }))
}
is IOException -> return@onErrorResumeNext Single.error(Exception("Network error"))
else -> return@onErrorResumeNext Single.error(e)
}
}
fun <T> Observable<T>.handelNetworkError() =
onErrorResumeNext { e : Throwable ->
when (e) {
is OfflineException -> return@onErrorResumeNext Observable.error(Exception("check your internet connection"))
is SocketTimeoutException -> return@onErrorResumeNext Observable.error(Exception("server not fount"))
is retrofit2.HttpException -> {
val responseBody = e.response().errorBody()
return@onErrorResumeNext Observable.error(Exception(responseBody?.run { getErrorMessage(responseBody) }))
}
is IOException -> return@onErrorResumeNext Observable.error(Exception("Network error"))
else -> return@onErrorResumeNext Observable.error(e)
}
}
fun getErrorMessage(responseBody: ResponseBody): String? {
return try {
val jsonObject = JSONObject(responseBody.string())
jsonObject.getString("message")
} catch (e: Exception) {
e.message
}
}
class OfflineException : IOException() {
override val message: String?
get() = "no internet!"
}
用法:
api.getUserList().handelNetworkError().subscribe { }