错误后的新单曲

New single after error

自己开发下载用户数据流,遇到问题。先买一些代码,方法解释:

loginManager.getAccessToken() //Returns Single<String> where string is my access token
loginManager.getRefreshToken() //Returns Single<String> where string is my refresh token
loginManager.downloadUserAccountData(String accessToken) //Returns Single<String> where string is my json response
loginManager.refreshAccessToken(String refreshToken) //Returns Single<String> where string is new access token
userManager.parseUserJson(String json) //Returns Single<User> where User is my user model
userManager.storeUser(User user) //Returns Completable

目前我正在做类似的事情:

Completable getAndStoreUserData = loginManager.getAccessToken()
        .flatMap(loginManager::downloadUserAccountData)
        .flatMap(userManager::parseUserJson)
        .flatMapCompletable(userManager::storeUser);

但这不支持,刷新访问令牌并重试下载。服务器抛出 403 当我的 accessToken 不是当前的,所以我的函数 downloadUserAccountData

在这种情况下出现 return 错误

现在我尝试使用 doOnError()onErrorReturn() 但这两个运算符都不符合我的要求。这是因为 doOnError() 只允许例如显示 Log 而不要 return 任何东西。 onErrorReturn() 让我 return 只值而不是 Single<Value>.

这是我试图实现的伪代码:

Completable getAndStoreUserData = loginManager.getAccessToken()
        .flatMap(loginManager::downloadUserAccountData)
        .onErrorReturn(error -> loginManager.getRefreshToken()
                .flatMap(loginManager::refreshAccessToken)
                .flatMap(loginManager::downloadUserAccountData)
        .flatMap(userManager::parseUserJson)
        .flatMapCompletable(userManager::storeUser);

任何示例表示赞赏:)

也许你可以使用 retryWhen

Returns an Observable that emits the same values as the source ObservableSource with the exception of an onError. An onError notification from the source will result in the emission of a Throwable item to the ObservableSource provided as an argument to the notificationHandler function. If that ObservableSource calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.

    Completable getAndStoreUserData = loginManager.getAccessToken()
            .flatMap(loginManager::downloadUserAccountData)
    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@io.reactivex.annotations.NonNull Observable<Throwable> throwableObservable) throws Exception {

                    return throwableObservable.map(new Function<Throwable, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(@io.reactivex.annotations.NonNull Throwable throwable) throws Exception {
                            if (throwable.getMessage().contains("403")){  // dummy test example
                                return loginManager.getRefreshToken()
                                        .flatMap(loginManager::refreshAccessToken)
                                        .flatMap(loginManager::downloadUserAccountData)
                            }
                            return whatever;
                        }
                    });
                }
            })
   .flatMap(userManager::parseUserJson)
   .flatMapCompletable(userManager::storeUser);

Here 其他一些例子。

抱歉我的英语不好,我对 lambda 不熟悉。

终于成功了,怎么做到的。正如@Cochi 建议的那样,我使用了 retryWhen() 函数。这是基于 String 的非常简单的示例,适用于其他任何人:

String body = null;

@Override
protected void onStart() {
    super.onStart();

    Single<String> stringSingle = Single.create(source -> {
                if(body != null) {
                    source.onSuccess(body);
                }
                else {
                    source.onError(new Exception("403"));
                }
            })
            .retryWhen(errors -> errors.flatMap(error -> {
                if(error.getMessage().contains("403")) {
                    Throwable throwable = refresh().blockingGet();
                    if(throwable == null) {
                        return Flowable.just(new Object());
                    }
                    return Flowable.error(throwable);
                }
                return Flowable.error(error);
            }))
            .flatMap(obj -> Single.just(obj.toString()));
}

public Completable refresh() {
    return Completable.create(source -> {
        try {
            body = "Success";
            source.onComplete();
        }
        catch (Exception e) {
            source.onError(e);
        }
    });
}

还有完整的示例,使用我的代码:

Completable getAndStoreUserData() {}
    Completable getAndStoreUserData = loginManager.getAccessToken()
            .flatMap(loginManager::downloadUserAccountData)
            .retryWhen(errors -> errors.flatmap(this::retryConditionChecker))
            .flatMapCompletable(userManager::storeUser);

    return getAndStoreUserData
}

Publisher<?> retryConditionChecker(Throwable error) {
    if(error.getMessage().contains("403")) {

        Throwable throwable = loginManager.getRefreshToken()
                .flatMap(loginManager::refreshAccessToken)
                .flatMapCompletable(loginManager::replaceAccessToken)
                .blockingGet();

        if(throwable == null) {
            return Flowable.just(new Object());
        }
        return Flowable.error(throwable);
    }
    return Flowable.error(error);
}

但最大的问题是blockingGet()。我相信这是非常糟糕的解决方案。也许有人知道更好的吗?