Rxjava + 从不正确的线程访问领域

Rxjava + Realm access from incorrect thread

我从 Realm

收到这个异常 reading/writing

06-19 09:49:26.352 11404-11404/****** E/ContentValues: loadData: OnError Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created. java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created. at io.realm.BaseRealm.checkIfValid(BaseRealm.java:385) at io.realm.RealmResults.isLoaded(RealmResults.java:115) at io.realm.OrderedRealmCollectionImpl.size(OrderedRealmCollectionImpl.java:307) at io.realm.RealmResults.size(RealmResults.java:60) at java.util.AbstractCollection.isEmpty(AbstractCollection.java:86) at /****** .lambda$loadData[=13=](SplashPresenter.java:42) at /****** $$Lambda.test(Unknown Source) at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:45) at io.reactivex.observers.SerializedObserver.onNext(SerializedObserver.java:111) at io.reactivex.internal.operators.observable.ObservableDelay$DelayObserver.run(ObservableDelay.java:84) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761)

这是代码:

  mSubscribe = Observable.just(readData())
            .delay(DELAY, TimeUnit.SECONDS)
            .filter(value -> !value.isEmpty())
            .switchIfEmpty(createRequest())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
                getView().hideLoading();
                writeData(data);
            }, 
           (throwable -> {
            }));

读取数据

  private List<CategoryModel> readData() {
    Realm defaultInstance = Realm.getDefaultInstance();
    List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");

    defaultInstance.close();
    return title;
}

写入数据

private void writeData(List<CategoryModel> categoryModels) {

        try {
            Realm defaultInstance = Realm.getDefaultInstance();
            defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
            defaultInstance.close();
        } finally {
            getView().notifyActivity(categoryModels);
        }
    }

如何使用适当的线程遵循此逻辑?

您需要将所需数据从领域对象提取到 POJO 中,并使用映射运算符发出 POJO,以便视图对象可以在 android 主线程上使用 pojo 使用来自领域的数据进行更新。

The only rule to using Realm across threads is to remember that Realm, RealmObject or RealmResults instances cannot be passed across threads

当你想从不同的线程访问相同的数据时,你应该简单地获取一个新的领域实例(即Realm.getDefaultInstance())并通过一个查询(然后在线程结束时关闭 Realm)。

对象将映射到磁盘上的相同数据,并且可以从任何线程读取和写入!您还可以 运行 在后台线程上使用 realm.executeTransactionAsync() 的代码,例如 .

您只能在一个事务中操作 Realm 对象,或者只能在您 read/write 的线程中操作这些对象。在您的情况下,您从 readData 方法获取 RealmResult 并使用 RxJava 切换导致异常的线程。使用 copyFromRealm 从领域获取数据,这将 return 它们作为普通对象而不是领域对象。

How can i follow this logic using the proper threads?

通过不尝试在 Schedulers.io() 上阅读您的 UI 线程(Realm 提供自动更新延迟加载代理视图,为您在 UI 线程上的数据提供更改通知,毕竟)。


所以不用这个

 mSubscribe = Observable.just(readData())
        .delay(DELAY, TimeUnit.SECONDS)
        .filter(value -> !value.isEmpty())
        .switchIfEmpty(createRequest())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
            getView().hideLoading();
            writeData(data);
        }, 
       (throwable -> {
        }));

private List<CategoryModel> readData() {
    Realm defaultInstance = Realm.getDefaultInstance();
    List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");

    defaultInstance.close();
    return title;
}

private void writeData(List<CategoryModel> categoryModels) {
    try {
        Realm defaultInstance = Realm.getDefaultInstance();
        defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
        defaultInstance.close();
    } finally {
        getView().notifyActivity(categoryModels);
    }
}

你应该有这样的东西

private Observable<List<CategoryModel>> readData() { // Flowable with LATEST might be better.
    return io.reactivex.Observable.create(new ObservableOnSubscribe<List<CategoryModel>>() {
        @Override
        public void subscribe(ObservableEmitter<List<CategoryModel>> emitter)
                throws Exception {
            final Realm observableRealm = Realm.getDefaultInstance();
            final RealmResults<CategoryModel> results = observableRealm.where(CategoryModel.class).findAllSortedAsync("title");
            final RealmChangeListener<RealmResults<CategoryModel>> listener = results -> {
                if(!emitter.isDisposed() && results.isLoaded()) {
                    emitter.onNext(results);
                }
            };

            emitter.setDisposable(Disposables.fromRunnable(() -> {
                if(results.isValid()) {
                    results.removeChangeListener(listener);
                }
                observableRealm.close();
            }));
            results.addChangeListener(listener);
        }
    }).subscribeOn(AndroidSchedulers.mainThread())
            .unsubscribeOn(AndroidSchedulers.mainThread());
}

private void setSubscription() {
    mSubscribe = readData()
            .doOnNext((list) -> {
                if(list.isEmpty()) {
                    Single.fromCallable(() -> this::createRequest)
                            .subscribeOn(Schedulers.io())
                            .subscribe((data) -> {
                                writeData(data);
                            });
                }
            }).subscribe(data -> {
                if(!data.isEmpty()) {
                    getView().hideLoading();
                    getView().notifyActivity(data);
                }
            }, throwable -> {
                throwable.printStackTrace();
            });
}

private void writeData(List<CategoryModel> categoryModels) {
    try(Realm r = Realm.getDefaultInstance()) {
        r.executeTransaction(realm -> realm.insertOrUpdate(categoryModels));
    }
}

void unsubscribe() {
    mSubscribe.dispose();
    mSubscribe = null;
}

这样(如果我没有搞砸任何事情),你最终会得到描述的反应数据层 here and here,除了没有映射出整个结果的额外开销。

编辑:

从 Realm 4.0 开始,可以将 RealmResults 直接公开为 Flowable(在 UI 线程或后台循环线程上)。

public Flowable<List<MyObject>> getLiveResults() {
    try(Realm realm = Realm.getDefaultInstance()) {
        return realm.where(MyObject.class) 
                    .findAllAsync()
                    .asFlowable()
                    .filter(RealmResults::isLoaded);
    }
}