Retrofit2+RxJava2,无效令牌,retryWhen() 重新订阅时如何更新流
Retrofit2+RxJava2, Invalid token, how to update stream when retryWhen() re-subscribe
我在下面有这个简单的代码,它模拟了我目前正在尝试完成的场景
mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {
@Override
public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
return mApiService.api().getAccessToken();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Void value) {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
onError(e);
}
@Override
public void onComplete() {
}
});
为了明确目标,我列举一下:
- 使用当前访问令牌执行 POST 调用
- 如果它收到适当的错误(404,403、401 等)
- 执行 GET 调用以获得新的访问令牌
- 使用新的访问令牌重试整个序列
根据上面的代码和我目前对 .retryWhen() 的理解,如果原始 Observable[ 发生错误,它将执行=37=]( .postSomethingWithAccessToken()),并在必要时重试(根据重试中的条件),这里发生的是 .retryWhen() 在外部 Observable 之前先执行,导致不希望的重复请求,
根据我目前的理解(代码),我怎样才能实现上面提到的那些事情?任何帮助将不胜感激。 :(
编辑:当前解决方法:
mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken()
.doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
通过共享偏好更新令牌的方法
public void update(Authentication authentication) {
preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}
我注意到(我放置了一个日志)外部可观察对象的订阅和重试在主线程中执行,但是 retrying/resubscribing 的流正在跳过不同的调度程序线程,这似乎是一个竞争条件: (
onSubscrbie_outer_observable: Thread[main,5,main]
RetryWhen: Thread[main,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
// and so on...
这里有几个问题:
- 您需要在重试时将访问令牌传回给
postSomethingWithAccessToken
方法,否则您将使用相同的旧无效访问令牌重试。
- 您在逻辑不正确时重试,您必须对收到的错误做出响应
Observable
并将您的重试逻辑放在那里。正如您所说,此方法首先执行,而不是在发生错误时执行,throwableObservable
是对错误的响应,它将错误反映为排放 (onNext()
),您可以 flatMap()
每个错误并响应错误(用于将错误传递给源流) complete ,或 onNext()
带有一些对象以发出重试信号。
关于此主题的精彩 blog post ban Dan Lew。
所以你需要:
1) 将访问令牌存储在某个地方,您可以通过刷新访问令牌来更改它。
2) 修复逻辑正确响应错误时的重试
这是一个建议代码:
postSomethingWithAccessToken(request, accessToken)
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(
@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(
new Function<Throwable, ObservableSource<? extends R>>() {
@Override
public ObservableSource<? extends R> apply(
@NonNull Throwable throwable) throws Exception {
if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
return getAccessToken()
.doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
//or keep accessToken on some field, the point to have mutable
//var that you can change and postSomethingWithAccessToken can see
}
return Observable.error(throwable);
}
});
}
}
)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Result>() {
@Override
public void accept(@NonNull Result result) throws Exception {
//handle result
}
}
);
BIG 感谢 yosriz,他为我指出了正确的方向来解决我的磨牙问题,我不得不使用 defer
。所以我在 GitHub、Why resubscribe the source observable emit same output when I use retryWhen operator?
中结束了这个问题
这与我现在遇到的问题完全相同,对于在这里遇到相同问题的任何人,这里是我的解决方案。
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
// return an observable source here, the observable that will be the source of the entire stream;
}
})
.subscribeOn( /*target thread to run*/ )
.retryWhen( {
// return a throwable observable here that will perform the logic when an error occurred
})
.subscribe( /*subscription here*/ )
或者这里是我的解决方案的完整非 lambda
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
return mApiService.api().postSomethingWithAccessToken(
request, preferences.getString("access_token", ""));
}
})
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
这里的重点是"how to modify/update the existing source observable when .retryWhen()
operator resubscribe to the source observable"
我在下面有这个简单的代码,它模拟了我目前正在尝试完成的场景
mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {
@Override
public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
return mApiService.api().getAccessToken();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Void value) {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
onError(e);
}
@Override
public void onComplete() {
}
});
为了明确目标,我列举一下:
- 使用当前访问令牌执行 POST 调用
- 如果它收到适当的错误(404,403、401 等)
- 执行 GET 调用以获得新的访问令牌
- 使用新的访问令牌重试整个序列
根据上面的代码和我目前对 .retryWhen() 的理解,如果原始 Observable[ 发生错误,它将执行=37=]( .postSomethingWithAccessToken()),并在必要时重试(根据重试中的条件),这里发生的是 .retryWhen() 在外部 Observable 之前先执行,导致不希望的重复请求, 根据我目前的理解(代码),我怎样才能实现上面提到的那些事情?任何帮助将不胜感激。 :(
编辑:当前解决方法:
mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken()
.doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
通过共享偏好更新令牌的方法
public void update(Authentication authentication) {
preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}
我注意到(我放置了一个日志)外部可观察对象的订阅和重试在主线程中执行,但是 retrying/resubscribing 的流正在跳过不同的调度程序线程,这似乎是一个竞争条件: (
onSubscrbie_outer_observable: Thread[main,5,main]
RetryWhen: Thread[main,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
// and so on...
这里有几个问题:
- 您需要在重试时将访问令牌传回给
postSomethingWithAccessToken
方法,否则您将使用相同的旧无效访问令牌重试。 - 您在逻辑不正确时重试,您必须对收到的错误做出响应
Observable
并将您的重试逻辑放在那里。正如您所说,此方法首先执行,而不是在发生错误时执行,throwableObservable
是对错误的响应,它将错误反映为排放 (onNext()
),您可以flatMap()
每个错误并响应错误(用于将错误传递给源流) complete ,或onNext()
带有一些对象以发出重试信号。
关于此主题的精彩 blog post ban Dan Lew。
所以你需要:
1) 将访问令牌存储在某个地方,您可以通过刷新访问令牌来更改它。
2) 修复逻辑正确响应错误时的重试
这是一个建议代码:
postSomethingWithAccessToken(request, accessToken)
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(
@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(
new Function<Throwable, ObservableSource<? extends R>>() {
@Override
public ObservableSource<? extends R> apply(
@NonNull Throwable throwable) throws Exception {
if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
return getAccessToken()
.doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
//or keep accessToken on some field, the point to have mutable
//var that you can change and postSomethingWithAccessToken can see
}
return Observable.error(throwable);
}
});
}
}
)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Result>() {
@Override
public void accept(@NonNull Result result) throws Exception {
//handle result
}
}
);
BIG 感谢 yosriz,他为我指出了正确的方向来解决我的磨牙问题,我不得不使用 defer
。所以我在 GitHub、Why resubscribe the source observable emit same output when I use retryWhen operator?
这与我现在遇到的问题完全相同,对于在这里遇到相同问题的任何人,这里是我的解决方案。
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
// return an observable source here, the observable that will be the source of the entire stream;
}
})
.subscribeOn( /*target thread to run*/ )
.retryWhen( {
// return a throwable observable here that will perform the logic when an error occurred
})
.subscribe( /*subscription here*/ )
或者这里是我的解决方案的完整非 lambda
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
return mApiService.api().postSomethingWithAccessToken(
request, preferences.getString("access_token", ""));
}
})
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
这里的重点是"how to modify/update the existing source observable when .retryWhen()
operator resubscribe to the source observable"