RxJava2 concat 运算符不 return 适当的流

RxJava2 concat operator doesn't return appropriate stream

为了构建我的存储库,我正在从本地数据库和远程数据库中检索 2 个数据流 API。仅当 mQuakesLocalDataSource 没有项目时,我才尝试通过使用 concat 运算符访问远程流,如许多示例所示。

@NonNull
@Override
public Single<List<Quake>> getQuakes(){
    return Single.concat(mQuakesLocalDataSource.getQuakes(),
                            mQuakesRemoteDataSource.getQuakes())
                              .first(dummyList);
}

我面临的问题是 mQuakesRemoteDataSource 从来没有 returns 流,而 mQuakesLocalDataSource 是空的,因此我没有任何数据结果。我已经单独测试了 mQuakesRemoteDataSource,没有 concat 运算符,它似乎正在检索其适当的流。

为什么会这样?

mQuakesLocalDataSource 基于 Room,因此它应该发出其流然后完成,因此本地源不可能像 SQLbrite 那样发出永无止境的流。

我试过这个运算符的变体,比如 concatArray,结果是一样的,没有任何数据被检索到。

一个有趣的事实是,在调试时,我注意到 mQuakesLocalDataSourcemQuakesRemoteDataSource get 方法在传递到 concat 运算符之前都会被触发到达 first() 行。 concat 不应该在 first 运算符存在的情况下一个一个地评估来源(并过滤当前的来源)吗?

我还尝试添加一个带有 Predicate 的过滤器,以便将数据缓存到本地数据源中,如下所示:

@NonNull
@Override
public Single<List<Quake>> getQuakes() {
    return Single.concat(mQuakesLocalDataSource.getQuakes(),
            mQuakesRemoteDataSource.getQuakes()).filter(new Predicate<List<Quake>>() {
        @Override
        public boolean test(List<Quake> quakes) throws Exception {
            boolean isValid = quakes != null && !quakes.isEmpty();

            // save items to local data source
            if (isValid) saveQuakes(quakes);

            return isValid;
        }
    }).first(new ArrayList<>());
}

结果相同,未检索到任何数据。

Single 中返回一个空列表不会使 Single 为空,因此 first 将正确地停在第一项,空列表,从不调用远程源.您必须通过 flatMap 决定是否使用远程源而不是 concat 恢复:

mQuakesLocalDataSource.getQuakes()
.flatMap(list -> {
    if (list.isEmpty()) {
        return mQuakesRemoteDataSource.getQuakes();
    }
    return Single.just(list);
})