重构 google 的 NetworkBoundResource class 以使用 RxJava 而不是 LiveData
Refactoring google's NetworkBoundResource class to use RxJava instead of LiveData
Google 的 android 体系结构组件教程 here 有一部分解释了如何抽象化通过网络获取数据的逻辑。在其中,他们创建了一个名为 NetworkBoundResource 的抽象 class,使用 LiveData 创建一个反应流作为所有反应性网络请求的基础。
public abstract class NetworkBoundResource<ResultType, RequestType> {
private final AppExecutors appExecutors;
private final MediatorLiveData<Resource<ResultType>> result = new MediatorLiveData<>();
@MainThread
NetworkBoundResource(AppExecutors appExecutors) {
this.appExecutors = appExecutors;
result.setValue(Resource.loading(null));
LiveData<ResultType> dbSource = loadFromDb();
result.addSource(dbSource, data -> {
result.removeSource(dbSource);
if (shouldFetch()) {
fetchFromNetwork(dbSource);
} else {
result.addSource(dbSource, newData -> result.setValue(Resource.success(newData)));
}
});
}
private void fetchFromNetwork(final LiveData<ResultType> dbSource) {
LiveData<ApiResponse<RequestType>> apiResponse = createCall();
// we re-attach dbSource as a new source, it will dispatch its latest value quickly
result.addSource(dbSource, newData -> result.setValue(Resource.loading(newData)));
result.addSource(apiResponse, response -> {
result.removeSource(apiResponse);
result.removeSource(dbSource);
//noinspection ConstantConditions
if (response.isSuccessful()) {
appExecutors.diskIO().execute(() -> {
saveCallResult(processResponse(response));
appExecutors.mainThread().execute(() ->
// we specially request a new live data,
// otherwise we will get immediately last cached value,
// which may not be updated with latest results received from network.
result.addSource(loadFromDb(),
newData -> result.setValue(Resource.success(newData)))
);
});
} else {
onFetchFailed();
result.addSource(dbSource,
newData -> result.setValue(Resource.error(response.errorMessage, newData)));
}
});
}
protected void onFetchFailed() {
}
public LiveData<Resource<ResultType>> asLiveData() {
return result;
}
@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {
return response.body;
}
@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract LiveData<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract LiveData<ApiResponse<RequestType>> createCall();
}
据我了解,这个class的逻辑是:
a) 创建一个名为 "result" 的 MediatorLiveData 作为主 return 对象并将其初始值设置为 Resource.loading(null)
b) 从 Android Room db 获取数据作为 dbSource LiveData 并将其添加到 "result" 作为源 LiveData
c) 在 dbSource LiveData 首次发射时,从 "result" 中删除 dbSource LiveData 并调用 "shouldFetchFromNetwork()",这将
- 如果为真,调用 "fetchDataFromNetwork(dbSource)" 通过 "createCall()" 创建网络调用,return 是封装为 ApiResponse 对象的响应的 LiveData
- 将 dbSource LiveData 添加回 "result" 并将发出的值设置为 Resource.loading(data)
- 将 apiResponce LiveData 添加到 "result" 并在第一次发射时删除 dbSource 和 apiResponce LiveDatas
- 如果 apiResponse 成功,调用 "saveCallResult(processResponse(response))" 并将 dbSource LiveData 添加回 "result" 并将发出的值设置为 Resource.success(newData)
- 如果 apiResponse 失败,调用 "onFetchFailed()" 并将 dbSource LiveData 添加回 "result" 并将发出的值设置为 Resource.error(response.errorMessage, newData))
- 如果为假,只需将 dbSource LiveData 添加到 "result" 并将发出的值设置为 Resource.success(newData)
鉴于此逻辑是正确的解释,我已尝试重构此 class 以使用 RxJava Observables 而不是 LiveData。这是我成功重构的尝试(我删除了最初的 Resource.loading(null),因为我认为这是多余的)。
public abstract class NetworkBoundResource<ResultType, RequestType> {
private Observable<Resource<ResultType>> result;
@MainThread
NetworkBoundResource() {
Observable<Resource<ResultType>> source;
if (shouldFetch()) {
source = createCall()
.subscribeOn(Schedulers.io())
.doOnNext(apiResponse -> saveCallResult(processResponse(apiResponse)))
.flatMap(apiResponse -> loadFromDb().toObservable().map(Resource::success))
.doOnError(t -> onFetchFailed())
.onErrorResumeNext(t -> {
return loadFromDb()
.toObservable()
.map(data -> Resource.error(t.getMessage(), data))
})
.observeOn(AndroidSchedulers.mainThread());
} else {
source = loadFromDb()
.toObservable()
.map(Resource::success);
}
result = Observable.concat(
loadFromDb()
.toObservable()
.map(Resource::loading)
.take(1),
source
);
}
public Observable<Resource<ResultType>> asObservable() {return result;}
protected void onFetchFailed() {}
@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {return response.body;}
@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract Observable<ApiResponse<RequestType>> createCall();
}
由于我是 RxJava 的新手,我的问题是我是否正确地重构为 RxJava 并保持与此 class 的 LiveData 版本相同的逻辑?
public abstract class ApiRepositorySource<RawResponse extends BaseResponse, ResultType> {
// result is a Flowable because Room Database only returns Flowables
// Retrofit response will also be folded into the stream as a Flowable
private Flowable<ApiResource<ResultType>> result;
private AppDatabase appDatabase;
@MainThread
ApiRepositorySource(AppDatabase appDatabase) {
this.appDatabase = appDatabase;
Flowable<ApiResource<ResultType>> source;
if (shouldFetch()) {
source = createCall()
.doOnNext(this::saveCallResult)
.flatMap(apiResponse -> loadFromDb().toObservable().map(ApiResource::success))
.doOnError(this::onFetchFailed)
.onErrorResumeNext(t -> {
return loadFromDb()
.toObservable()
.map(data -> {
ApiResource apiResource;
if (t instanceof HttpException && ((HttpException) t).code() >= 400 && ((HttpException) t).code() < 500) {
apiResource = ApiResource.invalid(t.getMessage(), data);
} else {
apiResource = ApiResource.error(t.getMessage(), data);
}
return apiResource;
});
})
.toFlowable(BackpressureStrategy.LATEST);
} else {
source = loadFromDb()
.subscribeOn(Schedulers.io())
.map(ApiResource::success);
}
result = Flowable.concat(initLoadDb()
.map(ApiResource::loading)
.take(1),
source)
.subscribeOn(Schedulers.io());
}
public Observable<ApiResource<ResultType>> asObservable() {
return result.toObservable();
}
@SuppressWarnings("WeakerAccess")
protected void onFetchFailed(Throwable t) {
Timber.e(t);
}
@WorkerThread
protected void saveCallResult(@NonNull RawResult resultType) {
resultType.saveResponseToDb(appDatabase);
}
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract Observable<RawResult> createCall();
@NonNull
@MainThread
protected Flowable<ResultType> initLoadDb() {
return loadFromDb();
}
}
所以这是我在多次迭代后决定使用的。这目前正在生产中,并且适用于我的应用程序。以下是一些外带笔记:
创建一个BaseResponse
界面
public interface BaseResponse {
void saveResponseToDb(AppDatabase appDatabase);
}
并在所有 api 响应对象 classes 中实现它。这样做意味着您不必在每个 ApiResource 中实现 save_to_database 逻辑,如果需要,您可以将其默认为响应的实现。
为了简单起见,我选择在 onErrorResumeNext 块中处理 Retrofit 错误响应,但我建议您创建一个可以容纳所有这些逻辑的 Transformer class。在这种情况下,我为 ApiResources 添加了一个额外的 Status
枚举值,称为 INVALID
用于 400 级响应。
您可能想使用 LiveData 的 Reactive Streams 架构组件库
implementation "android.arch.lifecycle:reactivestreams:$lifecycle_version"
并向此 class 添加一个名为
的方法
public LiveData<ApiResource<ResultType>> asLiveData {
return LiveDataReactiveStreams.fromPublisher(result);
}
理论上,这会完美地工作,因为我们的 ViewModel 不必将 Observable 发射转换为 LiveData 发射或为视图中的 Observable 实现生命周期逻辑。不幸的是,这个流在每次配置更改时都会重建,因为它会在任何调用的 onDestroy 中处理 LiveData(无论 isFinishing 是真还是假)。因此,我们必须管理这个流的生命周期,这首先违背了使用它的目的,或者每次设备旋转时都会重复调用。
这是一个 UserRepository
创建 ApiNetworkResource
实例的示例
@Singleton
public class UserRepository {
private final RetrofitApi retrofitApi;
private final AppDatabase appDatabase;
@Inject
UserRepository(RetrofitApi retrofitApi, AppDatabase appDatabase) {
this.retrofitApi = retrofitApi;
this.appDatabase = appDatabase;
}
public Observable<ApiResource<User>> getUser(long userId) {
return new ApiRepositorySource<UserResponse, User>(appDatabase) {
@Override
protected boolean shouldFetch() {
return true;
}
@NonNull
@Override
protected Flowable<User> loadFromDb() {
return appDatabase.userDao().getUserFlowable(userId);
}
@NonNull
@Override
protected Observable<UserResponse> createCall() {
return retrofitApi.getUserById(userId);
}
}.asObservable();
}
}
Google 的 android 体系结构组件教程 here 有一部分解释了如何抽象化通过网络获取数据的逻辑。在其中,他们创建了一个名为 NetworkBoundResource 的抽象 class,使用 LiveData 创建一个反应流作为所有反应性网络请求的基础。
public abstract class NetworkBoundResource<ResultType, RequestType> {
private final AppExecutors appExecutors;
private final MediatorLiveData<Resource<ResultType>> result = new MediatorLiveData<>();
@MainThread
NetworkBoundResource(AppExecutors appExecutors) {
this.appExecutors = appExecutors;
result.setValue(Resource.loading(null));
LiveData<ResultType> dbSource = loadFromDb();
result.addSource(dbSource, data -> {
result.removeSource(dbSource);
if (shouldFetch()) {
fetchFromNetwork(dbSource);
} else {
result.addSource(dbSource, newData -> result.setValue(Resource.success(newData)));
}
});
}
private void fetchFromNetwork(final LiveData<ResultType> dbSource) {
LiveData<ApiResponse<RequestType>> apiResponse = createCall();
// we re-attach dbSource as a new source, it will dispatch its latest value quickly
result.addSource(dbSource, newData -> result.setValue(Resource.loading(newData)));
result.addSource(apiResponse, response -> {
result.removeSource(apiResponse);
result.removeSource(dbSource);
//noinspection ConstantConditions
if (response.isSuccessful()) {
appExecutors.diskIO().execute(() -> {
saveCallResult(processResponse(response));
appExecutors.mainThread().execute(() ->
// we specially request a new live data,
// otherwise we will get immediately last cached value,
// which may not be updated with latest results received from network.
result.addSource(loadFromDb(),
newData -> result.setValue(Resource.success(newData)))
);
});
} else {
onFetchFailed();
result.addSource(dbSource,
newData -> result.setValue(Resource.error(response.errorMessage, newData)));
}
});
}
protected void onFetchFailed() {
}
public LiveData<Resource<ResultType>> asLiveData() {
return result;
}
@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {
return response.body;
}
@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract LiveData<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract LiveData<ApiResponse<RequestType>> createCall();
}
据我了解,这个class的逻辑是:
a) 创建一个名为 "result" 的 MediatorLiveData 作为主 return 对象并将其初始值设置为 Resource.loading(null)
b) 从 Android Room db 获取数据作为 dbSource LiveData 并将其添加到 "result" 作为源 LiveData
c) 在 dbSource LiveData 首次发射时,从 "result" 中删除 dbSource LiveData 并调用 "shouldFetchFromNetwork()",这将
- 如果为真,调用 "fetchDataFromNetwork(dbSource)" 通过 "createCall()" 创建网络调用,return 是封装为 ApiResponse 对象的响应的 LiveData
- 将 dbSource LiveData 添加回 "result" 并将发出的值设置为 Resource.loading(data)
- 将 apiResponce LiveData 添加到 "result" 并在第一次发射时删除 dbSource 和 apiResponce LiveDatas
- 如果 apiResponse 成功,调用 "saveCallResult(processResponse(response))" 并将 dbSource LiveData 添加回 "result" 并将发出的值设置为 Resource.success(newData)
- 如果 apiResponse 失败,调用 "onFetchFailed()" 并将 dbSource LiveData 添加回 "result" 并将发出的值设置为 Resource.error(response.errorMessage, newData))
- 如果为假,只需将 dbSource LiveData 添加到 "result" 并将发出的值设置为 Resource.success(newData)
鉴于此逻辑是正确的解释,我已尝试重构此 class 以使用 RxJava Observables 而不是 LiveData。这是我成功重构的尝试(我删除了最初的 Resource.loading(null),因为我认为这是多余的)。
public abstract class NetworkBoundResource<ResultType, RequestType> {
private Observable<Resource<ResultType>> result;
@MainThread
NetworkBoundResource() {
Observable<Resource<ResultType>> source;
if (shouldFetch()) {
source = createCall()
.subscribeOn(Schedulers.io())
.doOnNext(apiResponse -> saveCallResult(processResponse(apiResponse)))
.flatMap(apiResponse -> loadFromDb().toObservable().map(Resource::success))
.doOnError(t -> onFetchFailed())
.onErrorResumeNext(t -> {
return loadFromDb()
.toObservable()
.map(data -> Resource.error(t.getMessage(), data))
})
.observeOn(AndroidSchedulers.mainThread());
} else {
source = loadFromDb()
.toObservable()
.map(Resource::success);
}
result = Observable.concat(
loadFromDb()
.toObservable()
.map(Resource::loading)
.take(1),
source
);
}
public Observable<Resource<ResultType>> asObservable() {return result;}
protected void onFetchFailed() {}
@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {return response.body;}
@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract Observable<ApiResponse<RequestType>> createCall();
}
由于我是 RxJava 的新手,我的问题是我是否正确地重构为 RxJava 并保持与此 class 的 LiveData 版本相同的逻辑?
public abstract class ApiRepositorySource<RawResponse extends BaseResponse, ResultType> {
// result is a Flowable because Room Database only returns Flowables
// Retrofit response will also be folded into the stream as a Flowable
private Flowable<ApiResource<ResultType>> result;
private AppDatabase appDatabase;
@MainThread
ApiRepositorySource(AppDatabase appDatabase) {
this.appDatabase = appDatabase;
Flowable<ApiResource<ResultType>> source;
if (shouldFetch()) {
source = createCall()
.doOnNext(this::saveCallResult)
.flatMap(apiResponse -> loadFromDb().toObservable().map(ApiResource::success))
.doOnError(this::onFetchFailed)
.onErrorResumeNext(t -> {
return loadFromDb()
.toObservable()
.map(data -> {
ApiResource apiResource;
if (t instanceof HttpException && ((HttpException) t).code() >= 400 && ((HttpException) t).code() < 500) {
apiResource = ApiResource.invalid(t.getMessage(), data);
} else {
apiResource = ApiResource.error(t.getMessage(), data);
}
return apiResource;
});
})
.toFlowable(BackpressureStrategy.LATEST);
} else {
source = loadFromDb()
.subscribeOn(Schedulers.io())
.map(ApiResource::success);
}
result = Flowable.concat(initLoadDb()
.map(ApiResource::loading)
.take(1),
source)
.subscribeOn(Schedulers.io());
}
public Observable<ApiResource<ResultType>> asObservable() {
return result.toObservable();
}
@SuppressWarnings("WeakerAccess")
protected void onFetchFailed(Throwable t) {
Timber.e(t);
}
@WorkerThread
protected void saveCallResult(@NonNull RawResult resultType) {
resultType.saveResponseToDb(appDatabase);
}
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract Observable<RawResult> createCall();
@NonNull
@MainThread
protected Flowable<ResultType> initLoadDb() {
return loadFromDb();
}
}
所以这是我在多次迭代后决定使用的。这目前正在生产中,并且适用于我的应用程序。以下是一些外带笔记:
创建一个
BaseResponse
界面public interface BaseResponse { void saveResponseToDb(AppDatabase appDatabase); }
并在所有 api 响应对象 classes 中实现它。这样做意味着您不必在每个 ApiResource 中实现 save_to_database 逻辑,如果需要,您可以将其默认为响应的实现。
为了简单起见,我选择在 onErrorResumeNext 块中处理 Retrofit 错误响应,但我建议您创建一个可以容纳所有这些逻辑的 Transformer class。在这种情况下,我为 ApiResources 添加了一个额外的
Status
枚举值,称为INVALID
用于 400 级响应。您可能想使用 LiveData 的 Reactive Streams 架构组件库
的方法implementation "android.arch.lifecycle:reactivestreams:$lifecycle_version"
并向此 class 添加一个名为public LiveData<ApiResource<ResultType>> asLiveData { return LiveDataReactiveStreams.fromPublisher(result); }
理论上,这会完美地工作,因为我们的 ViewModel 不必将 Observable 发射转换为 LiveData 发射或为视图中的 Observable 实现生命周期逻辑。不幸的是,这个流在每次配置更改时都会重建,因为它会在任何调用的 onDestroy 中处理 LiveData(无论 isFinishing 是真还是假)。因此,我们必须管理这个流的生命周期,这首先违背了使用它的目的,或者每次设备旋转时都会重复调用。
这是一个 UserRepository
创建 ApiNetworkResource
@Singleton
public class UserRepository {
private final RetrofitApi retrofitApi;
private final AppDatabase appDatabase;
@Inject
UserRepository(RetrofitApi retrofitApi, AppDatabase appDatabase) {
this.retrofitApi = retrofitApi;
this.appDatabase = appDatabase;
}
public Observable<ApiResource<User>> getUser(long userId) {
return new ApiRepositorySource<UserResponse, User>(appDatabase) {
@Override
protected boolean shouldFetch() {
return true;
}
@NonNull
@Override
protected Flowable<User> loadFromDb() {
return appDatabase.userDao().getUserFlowable(userId);
}
@NonNull
@Override
protected Observable<UserResponse> createCall() {
return retrofitApi.getUserById(userId);
}
}.asObservable();
}
}