RxJava/Retrofit2/Java - NetworkBoundResource 未按预期工作
RxJava/Retrofit2/Java - NetworkBoundResource not working as expected
我遇到了一些连线问题。当我第一次订阅时,它会进行网络调用并将数据保存到数据库,但是 loadFromDb()
永远不会执行,也不会抛出任何错误。
为什么会这样?
Flowable<Resource<List<List<DataSource>>>> getBoundResource(List<String> parentId) {
return new RxNetworkBoundResource<List<List<DataSource>>,
ContainerResponse>() {
@Override
void saveCallResult(@NonNull List<ContainerResponse> data) {
for (ContainerResponse item : data) {
// Saves data to database
List<DataSource> items = item.items;
containerDao.insert(items);
}
}
@Override
protected Flowable<List<List<DataSource>>> loadFromDb() {
return Flowable.just(parentId).flatMapIterable(d -> d)
.flatMap(s -> containerDao.loadContainerByParentIdRx(s))
.distinct()
.doOnNext(data -> {
// I am able to get data here
})
.toList() // I'm not able to get data after toList()
.toFlowable()
.doOnNext(data -> {
// Nothing here
});
}
@Override
protected Flowable<List<Response<ContainerResponse>>> createCall() {
String baseUrl =
MyApp.getApplication().getSharedConfig().getBaseUrl();
return Flowable.just(parentId).flatMapIterable(data -> data).flatMap(s -> {
String url = baseUrl + "?limit=30&offset=0&parent=" + s;
return Flowable.zip(Flowable.just(s),webservice.getContainersBoundRx(url),
(s1, response) -> {
if (response.body() == null) {
return response;
}
for (DataSource container : response.body().items) {
container.parentId = s1;
}
return response;
}).toList().toFlowable();
});
}
@Override
protected boolean shouldFetch() {
return false;
}
}.asFlowable();
我在 subscribe()
之后什么也得不到。
containerRepo.getBoundResource(parentId)
.subscribe(new Subscriber<Resource<List<List<DataSource>>>>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(Resource<List<List<DataSource>>> listResource) {
// No data
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
// This is never called
}
});
NetworkboundResource
class:
public abstract class RxNetworkBoundResource<ResultType, RequestType> {
private final String TAG = RxNetworkBoundResource.class.getSimpleName();
private Flowable<Resource<ResultType>> result;
RxNetworkBoundResource() {
// Lazy db observable.
Flowable<ResultType> dbObservable =
Flowable.defer(() -> loadFromDb().subscribeOn(Schedulers.computation()));
// Lazy network observable.
Flowable<ResultType> networkObservable = Flowable.defer(() ->
createCall()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.doOnNext(request -> {
if (request.get(0).isSuccessful()) {
saveCallResult(processResponse(request));
} else {
processInternalError(request);
}
})
.onErrorReturn(throwable -> {
throw Exceptions.propagate(throwable);
})
.flatMap(__ -> loadFromDb())
);
result = shouldFetch()
? networkObservable
.map(Resource::success)
.onErrorReturn(t -> Resource.error(t.getMessage(), null))
.observeOn(AndroidSchedulers.mainThread())
: dbObservable
.map(Resource::success)
.onErrorReturn(t -> Resource.error(t.getMessage(), null))
.observeOn(AndroidSchedulers.mainThread())
;
}
Flowable<Resource<ResultType>> asFlowable() {
return result;
}
private List<RequestType> processResponse(List<Response<RequestType>> response) {
List<RequestType> list = new ArrayList<>();
for (Response<RequestType> data : response) {
list.add(data.body());
}
return list;
}
private void processInternalError(List<Response<RequestType>> response) throws java.io.IOException {
for (Response<RequestType> data : response) {
if (data.errorBody() != null) {
String error = data.errorBody().string();
throw Exceptions.propagate(new Throwable(data.code() + ": " + error));
}
}
}
abstract void saveCallResult(@NonNull List<RequestType> item);
abstract Flowable<ResultType> loadFromDb();
abstract Flowable<List<Response<RequestType>>> createCall();
abstract boolean shouldFetch();
}
请注意,.toList()
仅在其上游完成后才会发出。 Doc
这里的问题很可能是因为这段代码 returning a Flowable
没有完成:
containerDao.loadContainerByParentIdRx(s)
如果这个 Flowable
永远不会完成,那么生成的 flatMap
也不会完成并且 toList()
不会发出任何东西。
如果您只查找一次数据库,那么一种选择是将 return 类型更改为 Single
或 Maybe
。例如,如果你切换到 Maybe
,你可以这样做:
@Override
protected Flowable<List<List<DataSource>>> loadFromDb() {
return Flowable.just(parentId).flatMapIterable(d -> d)
.flatMapMaybe(s -> containerDao.loadContainerByParentIdRx(s))
.distinct()
.doOnNext(data -> {
// I am able to get data here
})
.toList() // You should now get this as well.
.toFlowable()
.doOnNext(data -> {
// Nothing here
});
}
我遇到了一些连线问题。当我第一次订阅时,它会进行网络调用并将数据保存到数据库,但是 loadFromDb()
永远不会执行,也不会抛出任何错误。
为什么会这样?
Flowable<Resource<List<List<DataSource>>>> getBoundResource(List<String> parentId) {
return new RxNetworkBoundResource<List<List<DataSource>>,
ContainerResponse>() {
@Override
void saveCallResult(@NonNull List<ContainerResponse> data) {
for (ContainerResponse item : data) {
// Saves data to database
List<DataSource> items = item.items;
containerDao.insert(items);
}
}
@Override
protected Flowable<List<List<DataSource>>> loadFromDb() {
return Flowable.just(parentId).flatMapIterable(d -> d)
.flatMap(s -> containerDao.loadContainerByParentIdRx(s))
.distinct()
.doOnNext(data -> {
// I am able to get data here
})
.toList() // I'm not able to get data after toList()
.toFlowable()
.doOnNext(data -> {
// Nothing here
});
}
@Override
protected Flowable<List<Response<ContainerResponse>>> createCall() {
String baseUrl =
MyApp.getApplication().getSharedConfig().getBaseUrl();
return Flowable.just(parentId).flatMapIterable(data -> data).flatMap(s -> {
String url = baseUrl + "?limit=30&offset=0&parent=" + s;
return Flowable.zip(Flowable.just(s),webservice.getContainersBoundRx(url),
(s1, response) -> {
if (response.body() == null) {
return response;
}
for (DataSource container : response.body().items) {
container.parentId = s1;
}
return response;
}).toList().toFlowable();
});
}
@Override
protected boolean shouldFetch() {
return false;
}
}.asFlowable();
我在 subscribe()
之后什么也得不到。
containerRepo.getBoundResource(parentId)
.subscribe(new Subscriber<Resource<List<List<DataSource>>>>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(Resource<List<List<DataSource>>> listResource) {
// No data
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
// This is never called
}
});
NetworkboundResource
class:
public abstract class RxNetworkBoundResource<ResultType, RequestType> {
private final String TAG = RxNetworkBoundResource.class.getSimpleName();
private Flowable<Resource<ResultType>> result;
RxNetworkBoundResource() {
// Lazy db observable.
Flowable<ResultType> dbObservable =
Flowable.defer(() -> loadFromDb().subscribeOn(Schedulers.computation()));
// Lazy network observable.
Flowable<ResultType> networkObservable = Flowable.defer(() ->
createCall()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.doOnNext(request -> {
if (request.get(0).isSuccessful()) {
saveCallResult(processResponse(request));
} else {
processInternalError(request);
}
})
.onErrorReturn(throwable -> {
throw Exceptions.propagate(throwable);
})
.flatMap(__ -> loadFromDb())
);
result = shouldFetch()
? networkObservable
.map(Resource::success)
.onErrorReturn(t -> Resource.error(t.getMessage(), null))
.observeOn(AndroidSchedulers.mainThread())
: dbObservable
.map(Resource::success)
.onErrorReturn(t -> Resource.error(t.getMessage(), null))
.observeOn(AndroidSchedulers.mainThread())
;
}
Flowable<Resource<ResultType>> asFlowable() {
return result;
}
private List<RequestType> processResponse(List<Response<RequestType>> response) {
List<RequestType> list = new ArrayList<>();
for (Response<RequestType> data : response) {
list.add(data.body());
}
return list;
}
private void processInternalError(List<Response<RequestType>> response) throws java.io.IOException {
for (Response<RequestType> data : response) {
if (data.errorBody() != null) {
String error = data.errorBody().string();
throw Exceptions.propagate(new Throwable(data.code() + ": " + error));
}
}
}
abstract void saveCallResult(@NonNull List<RequestType> item);
abstract Flowable<ResultType> loadFromDb();
abstract Flowable<List<Response<RequestType>>> createCall();
abstract boolean shouldFetch();
}
请注意,.toList()
仅在其上游完成后才会发出。 Doc
这里的问题很可能是因为这段代码 returning a Flowable
没有完成:
containerDao.loadContainerByParentIdRx(s)
如果这个 Flowable
永远不会完成,那么生成的 flatMap
也不会完成并且 toList()
不会发出任何东西。
如果您只查找一次数据库,那么一种选择是将 return 类型更改为 Single
或 Maybe
。例如,如果你切换到 Maybe
,你可以这样做:
@Override
protected Flowable<List<List<DataSource>>> loadFromDb() {
return Flowable.just(parentId).flatMapIterable(d -> d)
.flatMapMaybe(s -> containerDao.loadContainerByParentIdRx(s))
.distinct()
.doOnNext(data -> {
// I am able to get data here
})
.toList() // You should now get this as well.
.toFlowable()
.doOnNext(data -> {
// Nothing here
});
}