使用 RxJava 调用补充可观察对象(修复错误)的错误处理

Error handling to call a supplementary observable (to fix the error) with RxJava

我创建了一个接口,其中包含 return 可观察的三个方法。他们:

  1. 查询 REST API 以获取文章列表
  2. 将文章列表保存到本地数据库
  3. 从本地数据库中检索我所有的文章

但是,如果在#2 之前调用方法#3,它将 return 一个空列表,其中文章列表的大小 returned 为 0。那么我想检索来自 REST API 的新文章列表。这是我要创建的可观察链的伪代码。

  1. 致电getAllArticles()
  2. if (error occurs, or getAllArticles().size() == 0) then refreshArticles() (只调用一次),
  3. 再次呼叫getAllArticles()

我正在设计这个可观察链,以便可以按任何顺序调用 3 种方法中的任何一种,并且仍然按预期运行。我有以下界面:

/**
 * A repository abstraction in charge of handling all our data needs related to articles.
 */
public interface ArticleRepo {
    /**
     * Gets all the articles stored in the database
     *
     * @return An observable containing a list of articles
     */
    public Observable<List<ArticleEntity>> getAllArticles();


    /**
     * Refreshes the currently stored article repos, and returns
     * @return an updated {@link ArticleRepo}
     */
    public Observable<ArticleRepo> refreshArticles();

    /**
     * Saves the list of article entities to the database
     * @param articleEntityList
     * @return
     */
    Observable<List<ArticleEntity>> save(List<ArticleEntity> articleEntityList);

}

如何实现一种错误处理,以便在 getAllArticles() 无法 return 正确数据时自动调用 refreshArticles()/saveArticles()

How do I implement a type of error handling, so that refreshArticles()/saveArticles() is automatically called when getAllArticles() is not able to return the correct data?

我认为 onErrorResumeNext and switchIfEmpty 运算符对您的情况很有帮助。以下是如何使用这些运算符在 rx-java 中实施流程的示例:

final ArticleRepo repository = getCurrentArticleRepo();

final Observable<List<ArticleEntity>> refresh = repository.refreshArticles().flatMap(new Func1<ArticleRepo, Observable<List<ArticleEntity>>>() {
    @Override
    public Observable<List<ArticleEntity>> call(final ArticleRepo updated) {
        return updated.getAllArticles();
    }
}).flatMap(new Func1<List<ArticleEntity>, Observable<List<ArticleEntity>>>() {
    @Override
    public Observable<List<ArticleEntity>> call(final List<ArticleEntity> articles) {
        return repository.save(articles);
    }
});

repository.getAllArticles().onErrorResumeNext(refresh).filter(new Func1<List<ArticleEntity>, Boolean>() {
    @Override
    public Boolean call(final List<ArticleEntity> articles) {
        return articles == null || articles.size() == 0;
    }
}).switchIfEmpty(refresh).subscribe(new Action1<List<ArticleEntity>>() {
    @Override
    public void call(final List<ArticleEntity> articles) {
        // do something here
    }
});

PS:我不确定 refresh observable 是否正确实现,因为我不太明白什么时候需要调用 save 方法。