Rxjava + 从不正确的线程访问领域
Rxjava + Realm access from incorrect thread
我从 Realm
收到这个异常 reading/writing
06-19 09:49:26.352 11404-11404/****** E/ContentValues: loadData: OnError Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:385)
at io.realm.RealmResults.isLoaded(RealmResults.java:115)
at io.realm.OrderedRealmCollectionImpl.size(OrderedRealmCollectionImpl.java:307)
at io.realm.RealmResults.size(RealmResults.java:60)
at java.util.AbstractCollection.isEmpty(AbstractCollection.java:86)
at /****** .lambda$loadData[=13=](SplashPresenter.java:42)
at /****** $$Lambda.test(Unknown Source)
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:45)
at io.reactivex.observers.SerializedObserver.onNext(SerializedObserver.java:111)
at io.reactivex.internal.operators.observable.ObservableDelay$DelayObserver.run(ObservableDelay.java:84)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
这是代码:
mSubscribe = Observable.just(readData())
.delay(DELAY, TimeUnit.SECONDS)
.filter(value -> !value.isEmpty())
.switchIfEmpty(createRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
getView().hideLoading();
writeData(data);
},
(throwable -> {
}));
读取数据
private List<CategoryModel> readData() {
Realm defaultInstance = Realm.getDefaultInstance();
List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");
defaultInstance.close();
return title;
}
写入数据
private void writeData(List<CategoryModel> categoryModels) {
try {
Realm defaultInstance = Realm.getDefaultInstance();
defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
defaultInstance.close();
} finally {
getView().notifyActivity(categoryModels);
}
}
如何使用适当的线程遵循此逻辑?
您需要将所需数据从领域对象提取到 POJO 中,并使用映射运算符发出 POJO,以便视图对象可以在 android 主线程上使用 pojo 使用来自领域的数据进行更新。
当你想从不同的线程访问相同的数据时,你应该简单地获取一个新的领域实例(即Realm.getDefaultInstance()
)并通过一个查询(然后在线程结束时关闭 Realm)。
对象将映射到磁盘上的相同数据,并且可以从任何线程读取和写入!您还可以 运行 在后台线程上使用 realm.executeTransactionAsync()
的代码,例如
.
您只能在一个事务中操作 Realm 对象,或者只能在您 read/write 的线程中操作这些对象。在您的情况下,您从 readData 方法获取 RealmResult 并使用 RxJava 切换导致异常的线程。使用 copyFromRealm 从领域获取数据,这将 return 它们作为普通对象而不是领域对象。
How can i follow this logic using the proper threads?
通过不尝试在 Schedulers.io()
上阅读您的 UI 线程(Realm 提供自动更新延迟加载代理视图,为您在 UI 线程上的数据提供更改通知,毕竟)。
所以不用这个
mSubscribe = Observable.just(readData())
.delay(DELAY, TimeUnit.SECONDS)
.filter(value -> !value.isEmpty())
.switchIfEmpty(createRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
getView().hideLoading();
writeData(data);
},
(throwable -> {
}));
private List<CategoryModel> readData() {
Realm defaultInstance = Realm.getDefaultInstance();
List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");
defaultInstance.close();
return title;
}
private void writeData(List<CategoryModel> categoryModels) {
try {
Realm defaultInstance = Realm.getDefaultInstance();
defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
defaultInstance.close();
} finally {
getView().notifyActivity(categoryModels);
}
}
你应该有这样的东西
private Observable<List<CategoryModel>> readData() { // Flowable with LATEST might be better.
return io.reactivex.Observable.create(new ObservableOnSubscribe<List<CategoryModel>>() {
@Override
public void subscribe(ObservableEmitter<List<CategoryModel>> emitter)
throws Exception {
final Realm observableRealm = Realm.getDefaultInstance();
final RealmResults<CategoryModel> results = observableRealm.where(CategoryModel.class).findAllSortedAsync("title");
final RealmChangeListener<RealmResults<CategoryModel>> listener = results -> {
if(!emitter.isDisposed() && results.isLoaded()) {
emitter.onNext(results);
}
};
emitter.setDisposable(Disposables.fromRunnable(() -> {
if(results.isValid()) {
results.removeChangeListener(listener);
}
observableRealm.close();
}));
results.addChangeListener(listener);
}
}).subscribeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(AndroidSchedulers.mainThread());
}
private void setSubscription() {
mSubscribe = readData()
.doOnNext((list) -> {
if(list.isEmpty()) {
Single.fromCallable(() -> this::createRequest)
.subscribeOn(Schedulers.io())
.subscribe((data) -> {
writeData(data);
});
}
}).subscribe(data -> {
if(!data.isEmpty()) {
getView().hideLoading();
getView().notifyActivity(data);
}
}, throwable -> {
throwable.printStackTrace();
});
}
private void writeData(List<CategoryModel> categoryModels) {
try(Realm r = Realm.getDefaultInstance()) {
r.executeTransaction(realm -> realm.insertOrUpdate(categoryModels));
}
}
void unsubscribe() {
mSubscribe.dispose();
mSubscribe = null;
}
这样(如果我没有搞砸任何事情),你最终会得到描述的反应数据层 here and here,除了没有映射出整个结果的额外开销。
编辑:
从 Realm 4.0 开始,可以将 RealmResults 直接公开为 Flowable(在 UI 线程或后台循环线程上)。
public Flowable<List<MyObject>> getLiveResults() {
try(Realm realm = Realm.getDefaultInstance()) {
return realm.where(MyObject.class)
.findAllAsync()
.asFlowable()
.filter(RealmResults::isLoaded);
}
}
我从 Realm
收到这个异常 reading/writing06-19 09:49:26.352 11404-11404/****** E/ContentValues: loadData: OnError Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created. java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created. at io.realm.BaseRealm.checkIfValid(BaseRealm.java:385) at io.realm.RealmResults.isLoaded(RealmResults.java:115) at io.realm.OrderedRealmCollectionImpl.size(OrderedRealmCollectionImpl.java:307) at io.realm.RealmResults.size(RealmResults.java:60) at java.util.AbstractCollection.isEmpty(AbstractCollection.java:86) at /****** .lambda$loadData[=13=](SplashPresenter.java:42) at /****** $$Lambda.test(Unknown Source) at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:45) at io.reactivex.observers.SerializedObserver.onNext(SerializedObserver.java:111) at io.reactivex.internal.operators.observable.ObservableDelay$DelayObserver.run(ObservableDelay.java:84) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761)
这是代码:
mSubscribe = Observable.just(readData())
.delay(DELAY, TimeUnit.SECONDS)
.filter(value -> !value.isEmpty())
.switchIfEmpty(createRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
getView().hideLoading();
writeData(data);
},
(throwable -> {
}));
读取数据
private List<CategoryModel> readData() {
Realm defaultInstance = Realm.getDefaultInstance();
List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");
defaultInstance.close();
return title;
}
写入数据
private void writeData(List<CategoryModel> categoryModels) {
try {
Realm defaultInstance = Realm.getDefaultInstance();
defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
defaultInstance.close();
} finally {
getView().notifyActivity(categoryModels);
}
}
如何使用适当的线程遵循此逻辑?
您需要将所需数据从领域对象提取到 POJO 中,并使用映射运算符发出 POJO,以便视图对象可以在 android 主线程上使用 pojo 使用来自领域的数据进行更新。
当你想从不同的线程访问相同的数据时,你应该简单地获取一个新的领域实例(即Realm.getDefaultInstance()
)并通过一个查询(然后在线程结束时关闭 Realm)。
对象将映射到磁盘上的相同数据,并且可以从任何线程读取和写入!您还可以 运行 在后台线程上使用 realm.executeTransactionAsync()
的代码,例如
您只能在一个事务中操作 Realm 对象,或者只能在您 read/write 的线程中操作这些对象。在您的情况下,您从 readData 方法获取 RealmResult 并使用 RxJava 切换导致异常的线程。使用 copyFromRealm 从领域获取数据,这将 return 它们作为普通对象而不是领域对象。
How can i follow this logic using the proper threads?
通过不尝试在 Schedulers.io()
上阅读您的 UI 线程(Realm 提供自动更新延迟加载代理视图,为您在 UI 线程上的数据提供更改通知,毕竟)。
所以不用这个
mSubscribe = Observable.just(readData())
.delay(DELAY, TimeUnit.SECONDS)
.filter(value -> !value.isEmpty())
.switchIfEmpty(createRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
getView().hideLoading();
writeData(data);
},
(throwable -> {
}));
private List<CategoryModel> readData() {
Realm defaultInstance = Realm.getDefaultInstance();
List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");
defaultInstance.close();
return title;
}
private void writeData(List<CategoryModel> categoryModels) {
try {
Realm defaultInstance = Realm.getDefaultInstance();
defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
defaultInstance.close();
} finally {
getView().notifyActivity(categoryModels);
}
}
你应该有这样的东西
private Observable<List<CategoryModel>> readData() { // Flowable with LATEST might be better.
return io.reactivex.Observable.create(new ObservableOnSubscribe<List<CategoryModel>>() {
@Override
public void subscribe(ObservableEmitter<List<CategoryModel>> emitter)
throws Exception {
final Realm observableRealm = Realm.getDefaultInstance();
final RealmResults<CategoryModel> results = observableRealm.where(CategoryModel.class).findAllSortedAsync("title");
final RealmChangeListener<RealmResults<CategoryModel>> listener = results -> {
if(!emitter.isDisposed() && results.isLoaded()) {
emitter.onNext(results);
}
};
emitter.setDisposable(Disposables.fromRunnable(() -> {
if(results.isValid()) {
results.removeChangeListener(listener);
}
observableRealm.close();
}));
results.addChangeListener(listener);
}
}).subscribeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(AndroidSchedulers.mainThread());
}
private void setSubscription() {
mSubscribe = readData()
.doOnNext((list) -> {
if(list.isEmpty()) {
Single.fromCallable(() -> this::createRequest)
.subscribeOn(Schedulers.io())
.subscribe((data) -> {
writeData(data);
});
}
}).subscribe(data -> {
if(!data.isEmpty()) {
getView().hideLoading();
getView().notifyActivity(data);
}
}, throwable -> {
throwable.printStackTrace();
});
}
private void writeData(List<CategoryModel> categoryModels) {
try(Realm r = Realm.getDefaultInstance()) {
r.executeTransaction(realm -> realm.insertOrUpdate(categoryModels));
}
}
void unsubscribe() {
mSubscribe.dispose();
mSubscribe = null;
}
这样(如果我没有搞砸任何事情),你最终会得到描述的反应数据层 here and here,除了没有映射出整个结果的额外开销。
编辑:
从 Realm 4.0 开始,可以将 RealmResults 直接公开为 Flowable(在 UI 线程或后台循环线程上)。
public Flowable<List<MyObject>> getLiveResults() {
try(Realm realm = Realm.getDefaultInstance()) {
return realm.where(MyObject.class)
.findAllAsync()
.asFlowable()
.filter(RealmResults::isLoaded);
}
}