领域中的线程 - 干净的代码架构
Threading in Realm - Clean Code Architecture
我在 Android 和 Dagger with Realm 上使用 Clean Code Architecture,但我找不到使它们协同工作的方法。
底线是,我总是得到:
java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
@Override
public Observable<List<Item>> fetchAllItems() {
final List<Item> items = new ArrayList<>();
final Realm realm = Realm.getInstance(new RealmConfiguration.Builder(context).build());
final RealmQuery<Checkboxes> queryCheck = realm.where(Checkboxes.class);
final RealmQuery<ImageItem> queryImage = realm.where(ImageItem.class);
return queryCheck.findAll()
.asObservable()
.flatMap(new Func1<RealmResults<Checkboxes>, Observable<RealmResults<ImageItem>>>() {
@Override
public Observable<RealmResults<ImageItem>> call(RealmResults<Checkboxes> checkboxes) {
for (Checkboxes checks : checkboxes)
items.add(checks);
return queryImage.findAll().asObservable();
}
})
.flatMap(new Func1<RealmResults<ImageItem>, Observable<List<Item>>>() {
@Override
public Observable<List<Item>> call(RealmResults<ImageItem> imageItems) {
for (ImageItem images : imageItems)
items.add(images);
return Observable.create(new Observable.OnSubscribe<List<Item>>() {
@Override
public void call(Subscriber<? super List<Item>> subscriber) {
subscriber.onNext(items);
}
});
}
});
}
public class GetItemsUseCase extends UseCase {
private final ItemsRepository itemsRepository;
@Inject
public GetItemsUseCase(ItemsRepository itemsRepository, ThreadExecutor threadExecutor, PostExecutionThread postExecutionThread) {
super(threadExecutor, postExecutionThread);
this.itemsRepository = itemsRepository;
}
@Override
protected Observable buildUseCaseObservable() {
return this.itemsRepository.fetchAllItems();
}
}
和 UseCase:
public void execute(Subscriber UseCaseSubscriber) {
this.subscription = this.buildUseCaseObservable()
.subscribeOn(Schedulers.from(threadExecutor))
.observeOn(postExecutionThread.getScheduler())
.subscribe(UseCaseSubscriber);
}
我不确定是否有任何方法可以让 Realm 使用这种模式。
整个项目在我的 GitHub: https://github.com/leonardo2204/materialnotes/tree/bug_thread_realm
上开源
编辑:
Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:456)
at io.realm.RealmResults.addChangeListener(RealmResults.java:926)
at io.realm.rx.RealmObservableFactory.call(RealmObservableFactory.java:147)
at io.realm.rx.RealmObservableFactory.call(RealmObservableFactory.java:131)
at rx.Observable.unsafeSubscribe(Observable.java:9860)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:9860)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:9860)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:841)
您需要使用 realm.copyFromRealm()
在 RealmResults 中分离您的 Realm 对象。
编辑:它说你的问题是你的 Realm 实例与你订阅的地方不在同一个线程上,如果你订阅 Schedulers.from(threadExecutor)
这是公平的,但你正在初始化这个查询UI 线程。
asObservable()
无论如何都不会在后台线程上工作,只能在 UI 线程上工作——因为它需要添加一个变化监听器,而变化监听器需要主循环器来接收更新.
老实说,按照这个速度,您可以这样做(并失去自动更新):
final List<Item> items = new ArrayList<>();
Realm realm = null;
try {
realm = Realm.getInstance(new RealmConfiguration.Builder(context).build());
final RealmQuery<Checkboxes> queryCheck = realm.where(Checkboxes.class);
final RealmQuery<ImageItem> queryImage = realm.where(ImageItem.class);
RealmResults<Checkboxes> checkBoxes = queryCheck.findAll();
RealmResults<ImageItem> imageItems = queryImage.findAll();
for(Checkboxes checkbox : checkBoxes) {
items.add(realm.copyFromRealm(checkbox));
}
for(ImageItem checkbox : checkBoxes) {
items.add(realm.copyFromRealm(checkbox));
}
} finally {
if(realm != null) {
realm.close();
}
}
return Observable.just(items);
或者只是重新考虑您完全这样做的方式,因为最佳解决方案是使用 UI 线程中的 findAllAsync()
,并且如果您可以在一个结果列表中获得两个列表.
EDIT2:值得注意的是 asObservable()
的预期用途是替换 addChangeListener()
,但您只能 "observe changes" 循环线程上的领域(通常是 UI线程)。
所以asObservable()
的预期用途如下
private Subscription readFromEditText() {
return RxTextView.textChanges(editText).switchMap(charSequence -> {
String selectedName = charSequence.toString();
RealmQuery<Dog> query = realm.where(Dog.class);
if(selectedName != null && !"".equals(selectedName)) {
query = query.contains(DogFields.NAME, selectedName, Case.INSENSITIVE);
}
return query.findAllSortedAsync(DogFields.NAME)
.asObservable();
}).filter(RealmResults::isLoaded) //filter async realm query
.subscribe(dogs -> adapter.updateData(dogs));
}
我在 Android 和 Dagger with Realm 上使用 Clean Code Architecture,但我找不到使它们协同工作的方法。 底线是,我总是得到:
java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
@Override
public Observable<List<Item>> fetchAllItems() {
final List<Item> items = new ArrayList<>();
final Realm realm = Realm.getInstance(new RealmConfiguration.Builder(context).build());
final RealmQuery<Checkboxes> queryCheck = realm.where(Checkboxes.class);
final RealmQuery<ImageItem> queryImage = realm.where(ImageItem.class);
return queryCheck.findAll()
.asObservable()
.flatMap(new Func1<RealmResults<Checkboxes>, Observable<RealmResults<ImageItem>>>() {
@Override
public Observable<RealmResults<ImageItem>> call(RealmResults<Checkboxes> checkboxes) {
for (Checkboxes checks : checkboxes)
items.add(checks);
return queryImage.findAll().asObservable();
}
})
.flatMap(new Func1<RealmResults<ImageItem>, Observable<List<Item>>>() {
@Override
public Observable<List<Item>> call(RealmResults<ImageItem> imageItems) {
for (ImageItem images : imageItems)
items.add(images);
return Observable.create(new Observable.OnSubscribe<List<Item>>() {
@Override
public void call(Subscriber<? super List<Item>> subscriber) {
subscriber.onNext(items);
}
});
}
});
}
public class GetItemsUseCase extends UseCase {
private final ItemsRepository itemsRepository;
@Inject
public GetItemsUseCase(ItemsRepository itemsRepository, ThreadExecutor threadExecutor, PostExecutionThread postExecutionThread) {
super(threadExecutor, postExecutionThread);
this.itemsRepository = itemsRepository;
}
@Override
protected Observable buildUseCaseObservable() {
return this.itemsRepository.fetchAllItems();
}
}
和 UseCase:
public void execute(Subscriber UseCaseSubscriber) {
this.subscription = this.buildUseCaseObservable()
.subscribeOn(Schedulers.from(threadExecutor))
.observeOn(postExecutionThread.getScheduler())
.subscribe(UseCaseSubscriber);
}
我不确定是否有任何方法可以让 Realm 使用这种模式。 整个项目在我的 GitHub: https://github.com/leonardo2204/materialnotes/tree/bug_thread_realm
上开源编辑:
Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:456)
at io.realm.RealmResults.addChangeListener(RealmResults.java:926)
at io.realm.rx.RealmObservableFactory.call(RealmObservableFactory.java:147)
at io.realm.rx.RealmObservableFactory.call(RealmObservableFactory.java:131)
at rx.Observable.unsafeSubscribe(Observable.java:9860)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:9860)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:9860)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:94)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:841)
您需要使用 realm.copyFromRealm()
在 RealmResults 中分离您的 Realm 对象。
编辑:它说你的问题是你的 Realm 实例与你订阅的地方不在同一个线程上,如果你订阅 Schedulers.from(threadExecutor)
这是公平的,但你正在初始化这个查询UI 线程。
asObservable()
无论如何都不会在后台线程上工作,只能在 UI 线程上工作——因为它需要添加一个变化监听器,而变化监听器需要主循环器来接收更新.
老实说,按照这个速度,您可以这样做(并失去自动更新):
final List<Item> items = new ArrayList<>();
Realm realm = null;
try {
realm = Realm.getInstance(new RealmConfiguration.Builder(context).build());
final RealmQuery<Checkboxes> queryCheck = realm.where(Checkboxes.class);
final RealmQuery<ImageItem> queryImage = realm.where(ImageItem.class);
RealmResults<Checkboxes> checkBoxes = queryCheck.findAll();
RealmResults<ImageItem> imageItems = queryImage.findAll();
for(Checkboxes checkbox : checkBoxes) {
items.add(realm.copyFromRealm(checkbox));
}
for(ImageItem checkbox : checkBoxes) {
items.add(realm.copyFromRealm(checkbox));
}
} finally {
if(realm != null) {
realm.close();
}
}
return Observable.just(items);
或者只是重新考虑您完全这样做的方式,因为最佳解决方案是使用 UI 线程中的 findAllAsync()
,并且如果您可以在一个结果列表中获得两个列表.
EDIT2:值得注意的是 asObservable()
的预期用途是替换 addChangeListener()
,但您只能 "observe changes" 循环线程上的领域(通常是 UI线程)。
所以asObservable()
的预期用途如下
private Subscription readFromEditText() {
return RxTextView.textChanges(editText).switchMap(charSequence -> {
String selectedName = charSequence.toString();
RealmQuery<Dog> query = realm.where(Dog.class);
if(selectedName != null && !"".equals(selectedName)) {
query = query.contains(DogFields.NAME, selectedName, Case.INSENSITIVE);
}
return query.findAllSortedAsync(DogFields.NAME)
.asObservable();
}).filter(RealmResults::isLoaded) //filter async realm query
.subscribe(dogs -> adapter.updateData(dogs));
}