RxJava/Retrofit2/Java - NetworkBoundResource 未按预期工作

RxJava/Retrofit2/Java - NetworkBoundResource not working as expected

我遇到了一些连线问题。当我第一次订阅时,它会进行网络调用并将数据保存到数据库,但是 loadFromDb() 永远不会执行,也不会抛出任何错误。

为什么会这样?

    Flowable<Resource<List<List<DataSource>>>> getBoundResource(List<String> parentId) {
    return new RxNetworkBoundResource<List<List<DataSource>>,
            ContainerResponse>() {
        @Override
        void saveCallResult(@NonNull List<ContainerResponse> data) {
            for (ContainerResponse item : data) {
                // Saves data to database
                List<DataSource> items = item.items;
                containerDao.insert(items);
            }
        }

        @Override
        protected Flowable<List<List<DataSource>>> loadFromDb() {
            return Flowable.just(parentId).flatMapIterable(d -> d)
                    .flatMap(s -> containerDao.loadContainerByParentIdRx(s))
                    .distinct()
                    .doOnNext(data -> {
                        // I am able to get data here
                    })
                    .toList() // I'm not able to get data after toList()
                    .toFlowable()
                    .doOnNext(data -> {
                        // Nothing here
                    });
        }

        @Override
        protected Flowable<List<Response<ContainerResponse>>> createCall() {
            String baseUrl =
                    MyApp.getApplication().getSharedConfig().getBaseUrl();
            return Flowable.just(parentId).flatMapIterable(data -> data).flatMap(s -> {
                String url = baseUrl + "?limit=30&offset=0&parent=" + s;
                return Flowable.zip(Flowable.just(s),webservice.getContainersBoundRx(url),
                        (s1, response) -> {
                            if (response.body() == null) {
                                return response;
                            }
                            for (DataSource container : response.body().items) {
                                container.parentId = s1;
                            }
                            return response;
                        }).toList().toFlowable();
            });
        }

        @Override
        protected boolean shouldFetch() {
            return false;
        }
    }.asFlowable();

我在 subscribe() 之后什么也得不到。

    containerRepo.getBoundResource(parentId)
            .subscribe(new Subscriber<Resource<List<List<DataSource>>>>() {
                @Override
                public void onSubscribe(Subscription s) {

                }

                @Override
                public void onNext(Resource<List<List<DataSource>>> listResource) {
                   // No data
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {
                    // This is never called
                }
            });

NetworkboundResource class:

public abstract class RxNetworkBoundResource<ResultType, RequestType> {

private final String TAG = RxNetworkBoundResource.class.getSimpleName();

private Flowable<Resource<ResultType>> result;

RxNetworkBoundResource() {
    // Lazy db observable.
    Flowable<ResultType> dbObservable =
            Flowable.defer(() -> loadFromDb().subscribeOn(Schedulers.computation()));

    // Lazy network observable.
    Flowable<ResultType> networkObservable = Flowable.defer(() ->
            createCall()
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.computation())
                    .doOnNext(request -> {
                        if (request.get(0).isSuccessful()) {
                            saveCallResult(processResponse(request));
                        } else {
                            processInternalError(request);
                        }
                    })
                    .onErrorReturn(throwable -> {
                        throw Exceptions.propagate(throwable);
                    })
                    .flatMap(__ -> loadFromDb())
    );

    result = shouldFetch()
            ? networkObservable
            .map(Resource::success)
            .onErrorReturn(t -> Resource.error(t.getMessage(), null))
            .observeOn(AndroidSchedulers.mainThread())
            : dbObservable
            .map(Resource::success)
            .onErrorReturn(t -> Resource.error(t.getMessage(), null))
            .observeOn(AndroidSchedulers.mainThread())
    ;
}

Flowable<Resource<ResultType>> asFlowable() {
    return result;
}

private List<RequestType> processResponse(List<Response<RequestType>> response) {
    List<RequestType> list = new ArrayList<>();
    for (Response<RequestType> data : response) {
        list.add(data.body());
    }
    return list;
}

private void processInternalError(List<Response<RequestType>> response) throws java.io.IOException {
    for (Response<RequestType> data : response) {
        if (data.errorBody() != null) {
            String error = data.errorBody().string();
            throw Exceptions.propagate(new Throwable(data.code() + ": " + error));
        }
    }
}

abstract void saveCallResult(@NonNull List<RequestType> item);

abstract Flowable<ResultType> loadFromDb();

abstract Flowable<List<Response<RequestType>>> createCall();

abstract boolean shouldFetch();

}

请注意,.toList() 仅在其上游完成后才会发出。 Doc

这里的问题很可能是因为这段代码 returning a Flowable 没有完成:

containerDao.loadContainerByParentIdRx(s)

如果这个 Flowable 永远不会完成,那么生成的 flatMap 也不会完成并且 toList() 不会发出任何东西。

如果您只查找一次数据库,那么一种选择是将 return 类型更改为 SingleMaybe。例如,如果你切换到 Maybe,你可以这样做:

    @Override
    protected Flowable<List<List<DataSource>>> loadFromDb() {
        return Flowable.just(parentId).flatMapIterable(d -> d)
                .flatMapMaybe(s -> containerDao.loadContainerByParentIdRx(s))
                .distinct()
                .doOnNext(data -> {
                    // I am able to get data here
                })
                .toList() // You should now get this as well.
                .toFlowable()
                .doOnNext(data -> {
                    // Nothing here
                });
    }