使用 Retrofit 和 Realm 在 RxJava 中纠正流程

Correct flow in RxJava with Retrofit and Realm

我正在结合 RxJava 和 Retrofit 实现网络 API,我使用 Realm 作为我的数据库。我几乎可以正常工作,但我想知道这是否是正确的方法和事件流程。所以,这里是 RetrofitApiManager.

public class RetrofitApiManager {

    private static final String BASE_URL = "***";

    private final ShopApi shopApi;

    public RetrofitApiManager(OkHttpClient okHttpClient) {

        // GSON INITIALIZATION

        Retrofit retrofit = new Retrofit.Builder()
                .client(okHttpClient)
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create(gson))
                .baseUrl(BASE_URL)
                .build();

        shopApi = retrofit.create(ShopApi.class);
    }

    public Observable<RealmResults<Shop>> getShops() {
        return shopApi.getShops()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(response -> {
                    Realm realm = Realm.getDefaultInstance();
                    realm.executeTransaction(realm1 -> 
                            realm1.copyToRealmOrUpdate(response.shops));
                    realm.close();
                })
                .flatMap(response -> {
                    Realm realm = Realm.getDefaultInstance();
                    Observable<RealmResults<Shop>> results = realm.where(Shop.class)
                            .findAllAsync()
                            .asObservable()
                            .filter(RealmResults::isLoaded);
                    realm.close();
                    return results;
                });
    }
}

这是在 Fragment.

中获取 RealmResults<Shop> 的调用
realm.where(Shop.class)
        .findAllAsync()
        .asObservable()
        .filter(RealmResults::isLoaded)
        .first()
        .flatMap(shops -> 
                shops.isEmpty() ? retrofitApiManager.getShops() : Observable.just(shops))
        .subscribe(
                shops -> initRecyclerView(),
                throwable -> processError(throwable));

这是我的问题:

  1. 像上面的例子那样链式事件的正确方法是正确的还是我应该以不同的方式管理它们?

  2. 可以在 getShops() 方法中使用 Realm 实例并在那里关闭 i 还是将它作为参数传递然后以某种方式管理它会更好?虽然,这个想法似乎对线程有点问题,并且总是在正确的时间调用Realm.close()

1) 我会尝试在后台线程上做尽可能多的事情,现在你正在 UI 线程上做很多工作。

2)

  public Observable<RealmResults<Shop>> getShops() {
        return shopApi.getShops()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(response -> {
                    try(Realm realm = Realm.getDefaultInstance()) {
                        realm.executeTransaction(realm1 -> 
                            realm1.insertOrUpdate(response.shops));
                    } // auto-close
                })
                .flatMap(response -> {
                    try(Realm realm = Realm.getDefaultInstance()) {
                        Observable<RealmResults<Shop>> results = realm.where(Shop.class)
                            .findAllAsync()
                            .asObservable()
                            .filter(RealmResults::isLoaded);
                    } // auto-close
                    return results;
                });
    }

所有Realm 数据都是延迟加载的,因此只有在Realm 实例打开时才可用,因此检索后关闭它很可能无法正常工作。在你的情况下,虽然你在主线程上进行平面映射,但很可能那里已经有一个打开的实例。

如果您愿意,可以使用 copyFromRealm() 获取非托管数据,这些数据可以跨线程移动并且不再连接到 Realm,但它们也会失去实时更新功能并占用更多内存。

它可能会这样做:

  public Observable<RealmResults<Shop>> getShops() {
        return shopApi.getShops()
                .subscribeOn(Schedulers.io())
                .doOnNext(response -> {
                    try(Realm realm = Realm.getDefaultInstance()) {
                        realm.executeTransaction(realm1 -> 
                            realm1.copyToRealmOrUpdate(response.shops));
                    } // auto-close
                })
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(response -> {
                    Observable<RealmResults<Shop>> results = realm.where(Shop.class)
                            .findAllAsync()
                            .asObservable()
                            .filter(RealmResults::isLoaded);
                    return results;
                });

或者,您可以将网络请求视为副作用,只依赖于 Realm 在发生变化时通知您(更好的方法是 IMO,因为您将网络与数据库访问分开,例如存储库模式是关于什么的)

public Observable<RealmResults<Shop>> getShops() {
    // Realm will automatically notify this observable whenever data is saved from the network
    return realm.where(Shop.class).findAllAsync().asObservable()
            .filter(RealmResults::isLoaded)
            .doOnNext(results -> {
                if (results.size() == 0) {
                    loadShopsFromNetwork();
                }
            }); 
}

private void loadShopsFromNetwork() {
    shopApi.getShops()
            .subscribeOn(Schedulers.io())
            .subscribe(response -> {
                try(Realm realm = Realm.getDefaultInstance()) {
                    realm.executeTransaction(r -> r.insertOrUpdate(response.shops));
                } // auto-close
            });
}

Christian Melchior 在他的回答中提到的内容非常有道理,应该可以解决您手头的问题,但最终这种方法可能会引入其他问题。

在一个好的架构中,所有主要模块(或库)都应该与其余代码隔离开来。由于 Realm、RealmObject 或 RealmResult 不能跨线程传递,因此将 Realm 和 Realm 相关操作与代码的其余部分隔离开来更为重要。

对于您的每个 jsonModel class,您应该有一个 realmModel class 和一个 DAO(数据访问对象)。这里的想法是 class 的 DAO class none 必须知道或访问 realmModel 或 Realm。 DAO class 采用 jsonModel,转换为 realmModel,执行 read/write/edit/remove 操作,对于读取操作,DAO 将 realmModel 转换为 jsonModel 并使用它 returns。

这种方式很容易维护Realm,避免所有与Thread相关的问题,易于测试和调试。

这是一篇关于具有良好架构的 Realm 最佳实践的文章https://medium.com/@Viraj.Tank/realm-integration-in-android-best-practices-449919d25f2f

还有一个示例项目,演示了 Android 上的 Realm 与 MVP(模型视图展示器)、RxJava、Retrofit、Dagger、注释和测试的集成。 https://github.com/viraj49/Realm_android-injection-rx-test

在我的例子中,我似乎为 RealmRecyclerViewAdapter 定义了一个查询,如下所示:

    recyclerView.setAdapter(new CatAdapter(getContext(),
            realm.where(Cat.class).findAllSortedAsync(CatFields.RANK, Sort.ASCENDING)));

另外定义了一个条件,当满足条件时,使用 RxJava 进行 Retrofit 以下载更多内容:

    Subscription downloadCats = Observable.create(new RecyclerViewScrollBottomOnSubscribe(recyclerView))
            .filter(isScrollEvent -> isScrollEvent || realm.where(Cat.class).count() <= 0)
            .switchMap(isScrollEvent -> catService.getCats().subscribeOn(Schedulers.io()))  // RETROFIT
            .retry()
            .subscribe(catsBO -> {
                try(Realm outRealm = Realm.getDefaultInstance()) {
                    outRealm.executeTransaction((realm) -> {
                        Cat defaultCat = new Cat();
                        long rank;
                        if(realm.where(Cat.class).count() > 0) {
                            rank = realm.where(Cat.class).max(Cat.Fields.RANK.getField()).longValue();
                        } else {
                            rank = 0;
                        }
                        for(CatBO catBO : catsBO.getCats()) {
                            defaultCat.setId(catBO.getId());
                            defaultCat.setRank(++rank);
                            defaultCat.setSourceUrl(catBO.getSourceUrl());
                            defaultCat.setUrl(catBO.getUrl());
                            realm.insertOrUpdate(defaultCat);
                        }
                    });
                }
            }, throwable -> {
                Log.e(TAG, "An error occurred", throwable);
            });

例如,这是基于编辑文本输入的搜索:

    Subscription filterDogs = RxTextView.textChanges(editText)
                     .switchMap((charSequence) -> 
                           realm.where(Dog.class)
                                .contains(DogFields.NAME, charSequence.toString())
                                .findAllAsyncSorted(DogFields.NAME, Sort.ASCENDING)
                                .asObservable())
                     .filter(RealmResults::isLoaded) 
                     .subscribe(dogs -> realmRecyclerAdapter.updateData(dogs));