重构 google 的 NetworkBoundResource class 以使用 RxJava 而不是 LiveData

Refactoring google's NetworkBoundResource class to use RxJava instead of LiveData

Google 的 android 体系结构组件教程 here 有一部分解释了如何抽象化通过网络获取数据的逻辑。在其中,他们创建了一个名为 NetworkBoundResource 的抽象 class,使用 LiveData 创建一个反应流作为所有反应性网络请求的基础。

public abstract class NetworkBoundResource<ResultType, RequestType> {
private final AppExecutors appExecutors;

private final MediatorLiveData<Resource<ResultType>> result = new MediatorLiveData<>();

@MainThread
NetworkBoundResource(AppExecutors appExecutors) {
    this.appExecutors = appExecutors;
    result.setValue(Resource.loading(null));
    LiveData<ResultType> dbSource = loadFromDb();
    result.addSource(dbSource, data -> {
        result.removeSource(dbSource);
        if (shouldFetch()) {
            fetchFromNetwork(dbSource);
        } else {
            result.addSource(dbSource, newData -> result.setValue(Resource.success(newData)));
        }
    });
}

private void fetchFromNetwork(final LiveData<ResultType> dbSource) {
    LiveData<ApiResponse<RequestType>> apiResponse = createCall();
    // we re-attach dbSource as a new source, it will dispatch its latest value quickly
    result.addSource(dbSource, newData -> result.setValue(Resource.loading(newData)));
    result.addSource(apiResponse, response -> {
        result.removeSource(apiResponse);
        result.removeSource(dbSource);
        //noinspection ConstantConditions
        if (response.isSuccessful()) {
            appExecutors.diskIO().execute(() -> {
                saveCallResult(processResponse(response));
                appExecutors.mainThread().execute(() ->
                        // we specially request a new live data,
                        // otherwise we will get immediately last cached value,
                        // which may not be updated with latest results received from network.
                        result.addSource(loadFromDb(),
                                newData -> result.setValue(Resource.success(newData)))
                );
            });
        } else {
            onFetchFailed();
            result.addSource(dbSource,
                    newData -> result.setValue(Resource.error(response.errorMessage, newData)));
        }
    });
}

protected void onFetchFailed() {
}

public LiveData<Resource<ResultType>> asLiveData() {
    return result;
}

@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {
    return response.body;
}

@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);

@MainThread
protected abstract boolean shouldFetch();

@NonNull
@MainThread
protected abstract LiveData<ResultType> loadFromDb();

@NonNull
@MainThread
protected abstract LiveData<ApiResponse<RequestType>> createCall();
}

据我了解,这个class的逻辑是:

a) 创建一个名为 "result" 的 MediatorLiveData 作为主 return 对象并将其初始值设置为 Resource.loading(null)

b) 从 Android Room db 获取数据作为 dbSource LiveData 并将其添加到 "result" 作为源 LiveData

c) 在 dbSource LiveData 首次发射时,从 "result" 中删除 dbSource LiveData 并调用 "shouldFetchFromNetwork()",这将

  1. 如果为真,调用 "fetchDataFromNetwork(dbSource)" 通过 "createCall()" 创建网络调用,return 是封装为 ApiResponse 对象的响应的 LiveData
  2. 将 dbSource LiveData 添加回 "result" 并将发出的值设置为 Resource.loading(data)
  3. 将 apiResponce LiveData 添加到 "result" 并在第一次发射时删除 dbSource 和 apiResponce LiveDatas
  4. 如果 apiResponse 成功,调用 "saveCallResult(processResponse(response))" 并将 dbSource LiveData 添加回 "result" 并将发出的值设置为 Resource.success(newData)
  5. 如果 apiResponse 失败,调用 "onFetchFailed()" 并将 dbSource LiveData 添加回 "result" 并将发出的值设置为 Resource.error(response.errorMessage, newData))
  6. 如果为假,只需将 dbSource LiveData 添加到 "result" 并将发出的值设置为 Resource.success(newData)

鉴于此逻辑是正确的解释,我已尝试重构此 class 以使用 RxJava Observables 而不是 LiveData。这是我成功重构的尝试(我删除了最初的 Resource.loading(null),因为我认为这是多余的)。

public abstract class NetworkBoundResource<ResultType, RequestType> {

private Observable<Resource<ResultType>> result;

@MainThread
NetworkBoundResource() {
    Observable<Resource<ResultType>> source;
    if (shouldFetch()) {
        source = createCall()
                .subscribeOn(Schedulers.io())
                .doOnNext(apiResponse -> saveCallResult(processResponse(apiResponse)))
                .flatMap(apiResponse -> loadFromDb().toObservable().map(Resource::success))
                .doOnError(t -> onFetchFailed())
                .onErrorResumeNext(t -> {
                    return loadFromDb()
                            .toObservable()
                            .map(data -> Resource.error(t.getMessage(), data))

                })
                .observeOn(AndroidSchedulers.mainThread());
    } else {
        source = loadFromDb()
                .toObservable()
                .map(Resource::success);
    }

    result = Observable.concat(
            loadFromDb()
                    .toObservable()
                    .map(Resource::loading)
                    .take(1),
            source
    );
}

public Observable<Resource<ResultType>> asObservable() {return result;}

protected void onFetchFailed() {}

@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {return response.body;}

@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);

@MainThread
protected abstract boolean shouldFetch();

@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();

@NonNull
@MainThread
protected abstract Observable<ApiResponse<RequestType>> createCall();
}

由于我是 RxJava 的新手,我的问题是我是否正确地重构为 RxJava 并保持与此 class 的 LiveData 版本相同的逻辑?

public abstract class ApiRepositorySource<RawResponse extends BaseResponse, ResultType> {

    // result is a Flowable because Room Database only returns Flowables
    // Retrofit response will also be folded into the stream as a Flowable
    private Flowable<ApiResource<ResultType>> result; 
    private AppDatabase appDatabase;

    @MainThread
    ApiRepositorySource(AppDatabase appDatabase) {
        this.appDatabase = appDatabase;
        Flowable<ApiResource<ResultType>> source;
        if (shouldFetch()) {
            source = createCall()
                .doOnNext(this::saveCallResult)
                .flatMap(apiResponse -> loadFromDb().toObservable().map(ApiResource::success))
                .doOnError(this::onFetchFailed)
                .onErrorResumeNext(t -> {
                    return loadFromDb()
                            .toObservable()
                            .map(data -> {
                                ApiResource apiResource;

                                if (t instanceof HttpException && ((HttpException) t).code() >= 400 && ((HttpException) t).code() < 500) {
                                    apiResource = ApiResource.invalid(t.getMessage(), data);
                                } else {
                                    apiResource = ApiResource.error(t.getMessage(), data);
                                }

                                return apiResource;
                            });
                })
                .toFlowable(BackpressureStrategy.LATEST);
        } else {
            source = loadFromDb()
                    .subscribeOn(Schedulers.io())
                    .map(ApiResource::success);
        }

        result = Flowable.concat(initLoadDb()
                            .map(ApiResource::loading)
                            .take(1),
                            source)
                .subscribeOn(Schedulers.io());
    }

    public Observable<ApiResource<ResultType>> asObservable() {
        return result.toObservable();
    }

    @SuppressWarnings("WeakerAccess")
    protected void onFetchFailed(Throwable t) {
        Timber.e(t);
    }

    @WorkerThread
    protected void saveCallResult(@NonNull RawResult resultType) {
        resultType.saveResponseToDb(appDatabase);
    }

    @MainThread
    protected abstract boolean shouldFetch();

    @NonNull
    @MainThread
    protected abstract Flowable<ResultType> loadFromDb();

    @NonNull
    @MainThread
    protected abstract Observable<RawResult> createCall();

    @NonNull
    @MainThread
    protected Flowable<ResultType> initLoadDb() {
        return loadFromDb();
    }
}

所以这是我在多次迭代后决定使用的。这目前正在生产中,并且适用于我的应用程序。以下是一些外带笔记:

  1. 创建一个BaseResponse界面

        public interface BaseResponse {
             void saveResponseToDb(AppDatabase appDatabase);
        }
    

    并在所有 api 响应对象 classes 中实现它。这样做意味着您不必在每个 ApiResource 中实现 save_to_database 逻辑,如果需要,您可以将其默认为响应的实现。

  2. 为了简单起见,我选择在 onErrorResumeNext 块中处理 Retrofit 错误响应,但我建议您创建一个可以容纳所有这些逻辑的 Transformer class。在这种情况下,我为 ApiResources 添加了一个额外的 Status 枚举值,称为 INVALID 用于 400 级响应。

  3. 您可能想使用 LiveData 的 Reactive Streams 架构组件库

    implementation "android.arch.lifecycle:reactivestreams:$lifecycle_version" 并向此 class 添加一个名为

    的方法
        public LiveData<ApiResource<ResultType>> asLiveData {
             return LiveDataReactiveStreams.fromPublisher(result);
        }
    

    理论上,这会完美地工作,因为我们的 ViewModel 不必将 Observable 发射转换为 LiveData 发射或为视图中的 Observable 实现生命周期逻辑。不幸的是,这个流在每次配置更改时都会重建,因为它会在任何调用的 onDestroy 中处理 LiveData(无论 isFinishing 是真还是假)。因此,我们必须管理这个流的生命周期,这首先违背了使用它的目的,或者每次设备旋转时都会重复调用。

这是一个 UserRepository 创建 ApiNetworkResource

实例的示例
@Singleton
public class UserRepository {

    private final RetrofitApi retrofitApi;
    private final AppDatabase appDatabase;

    @Inject
    UserRepository(RetrofitApi retrofitApi, AppDatabase appDatabase) {
        this.retrofitApi = retrofitApi;
        this.appDatabase = appDatabase;
    }

    public Observable<ApiResource<User>> getUser(long userId) {
        return new ApiRepositorySource<UserResponse, User>(appDatabase) {

            @Override
            protected boolean shouldFetch() {
                return true;
            }

            @NonNull
            @Override
            protected Flowable<User> loadFromDb() {
                return appDatabase.userDao().getUserFlowable(userId);
            }

            @NonNull
            @Override
            protected Observable<UserResponse> createCall() {
                return retrofitApi.getUserById(userId);
            }
        }.asObservable();
    }

}