rxjava - 使用 concat 从 observerables 缓存不工作

rxjava - cache from observerables using concat not working

关注这个tutorial 我想我会创建一个假的内存、磁盘、网络对象,以便在失败时查看那里的顺序。我的目标是根据文章评论查看一个是否失败,然后继续移动到下一个:

The key to this pattern is that concat() only subscribes to each child Observable when it needs to. There's no unnecessary querying of slower sources if data is cached, since first() will stop the sequence early. In other words, if memory returns a result, then we won't bother going to disk or network. Conversely, if neither memory nor disk have data, it'll make a new network request.

然后我创建了以下测试:

 Data disk=new Data();
    disk.setAddress("38 Oriole");
    disk.setPostalCode("mc72l9");
    disk.setDate("feb 12");

Data network=new Data();
network.setAddress("39 skyway");
network.setPostalCode("mt82l9");
network.setDate("feb 13");


Observable observerMemory = Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(final Subscriber subscriber) {
                subscriber.onNext(null);
            }}
);


Observable<Data> observerDisk = Observable.from(disk);
Observable<Data> observerNetwork = Observable.from(network);

Observable.concat(observerMemory, observerDisk, observerNetwork).first().subscribe(new Subscriber<Data>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
        System.out.println("****" + e);
    }

    @Override
    public void onNext(Data data) {
        System.out.println(data.toString());
    }
});

然而,当 运行 这个测试时,我期待它打印磁盘可观察(流)。所以输出应该是 38 Oriole 等,因为第一个 observable 返回 null 并且 concat 应该在我认为失败的情况下获得下一个流。我的意思是根据文章,如果一个流失败,例如内存流,那么 concat 应该转到下一个流,即磁盘。但对我来说,它没有这样的事情。发射在第一个 null 错误处停止。我究竟做错了什么。在我继续使用反应式编程制作真正的缓存系统之前,我正在做这个假测试以确保这种方式有效。

顺便说一句,我使用的是 rxjava 依赖项:

compile 'com.netflix.rxjava:rxjava-android:0.20.7'

问题是 observerMemory 在订阅时调用了 onNext(null)。为了让 concat 移动到下一个 Observable,observerMemory 需要完成。如果将 subscriber.onNext(null) 替换为 subscriber.onComplete(),则第一个发出的项目将从磁盘中获取。