领域中的线程 - 干净的代码架构

Threading in Realm - Clean Code Architecture

我在 Android 和 Dagger with Realm 上使用 Clean Code Architecture,但我找不到使它们协同工作的方法。 底线是,我总是得到:

java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.

FetchItemsRepo:

 @Override
    public Observable<List<Item>> fetchAllItems() {
        final List<Item> items = new ArrayList<>();
        final Realm realm = Realm.getInstance(new RealmConfiguration.Builder(context).build());

        final RealmQuery<Checkboxes> queryCheck = realm.where(Checkboxes.class);
        final RealmQuery<ImageItem> queryImage = realm.where(ImageItem.class);

        return queryCheck.findAll()
                .asObservable()
                .flatMap(new Func1<RealmResults<Checkboxes>, Observable<RealmResults<ImageItem>>>() {
                    @Override
                    public Observable<RealmResults<ImageItem>> call(RealmResults<Checkboxes> checkboxes) {
                        for (Checkboxes checks : checkboxes)
                            items.add(checks);

                        return queryImage.findAll().asObservable();
                    }
                })
                .flatMap(new Func1<RealmResults<ImageItem>, Observable<List<Item>>>() {
                    @Override
                    public Observable<List<Item>> call(RealmResults<ImageItem> imageItems) {
                        for (ImageItem images : imageItems)
                            items.add(images);

                        return Observable.create(new Observable.OnSubscribe<List<Item>>() {
                            @Override
                            public void call(Subscriber<? super List<Item>> subscriber) {
                                subscriber.onNext(items);
                            }
                        });
                    }
                });
    }

GetItemsUseCase:

public class GetItemsUseCase extends UseCase {

    private final ItemsRepository itemsRepository;

    @Inject
    public GetItemsUseCase(ItemsRepository itemsRepository, ThreadExecutor threadExecutor, PostExecutionThread postExecutionThread) {
        super(threadExecutor, postExecutionThread);
        this.itemsRepository = itemsRepository;
    }

    @Override
    protected Observable buildUseCaseObservable() {
        return this.itemsRepository.fetchAllItems();
    }
}

UseCase:

public void execute(Subscriber UseCaseSubscriber) {
        this.subscription = this.buildUseCaseObservable()
                .subscribeOn(Schedulers.from(threadExecutor))
                .observeOn(postExecutionThread.getScheduler())
                .subscribe(UseCaseSubscriber);
    }

我不确定是否有任何方法可以让 Realm 使用这种模式。 整个项目在我的 GitHub: https://github.com/leonardo2204/materialnotes/tree/bug_thread_realm

上开源

编辑:

Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
 java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
 at io.realm.BaseRealm.checkIfValid(BaseRealm.java:456)
 at io.realm.RealmResults.addChangeListener(RealmResults.java:926)
 at io.realm.rx.RealmObservableFactory.call(RealmObservableFactory.java:147)
 at io.realm.rx.RealmObservableFactory.call(RealmObservableFactory.java:131)
 at rx.Observable.unsafeSubscribe(Observable.java:9860)
 at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
 at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
 at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
 at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
 at rx.Observable.unsafeSubscribe(Observable.java:9860)
 at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
 at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
 at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
 at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
 at rx.Observable.unsafeSubscribe(Observable.java:9860)
 at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:94)
 at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
 at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:104)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
 at java.lang.Thread.run(Thread.java:841)

您需要使用 realm.copyFromRealm() 在 RealmResults 中分离您的 Realm 对象。

编辑:它说你的问题是你的 Realm 实例与你订阅的地方不在同一个线程上,如果你订阅 Schedulers.from(threadExecutor) 这是公平的,但你正在初始化这个查询UI 线程。

asObservable() 无论如何都不会在后台线程上工作,只能在 UI 线程上工作——因为它需要添加一个变化监听器,而变化监听器需要主循环器来接收更新.

老实说,按照这个速度,您可以这样做(并失去自动更新):

    final List<Item> items = new ArrayList<>();
    Realm realm = null;
    try {
         realm = Realm.getInstance(new RealmConfiguration.Builder(context).build());

        final RealmQuery<Checkboxes> queryCheck = realm.where(Checkboxes.class);
        final RealmQuery<ImageItem> queryImage = realm.where(ImageItem.class);

        RealmResults<Checkboxes> checkBoxes = queryCheck.findAll();
        RealmResults<ImageItem> imageItems = queryImage.findAll();

        for(Checkboxes checkbox : checkBoxes) {
            items.add(realm.copyFromRealm(checkbox));
        }
        for(ImageItem checkbox : checkBoxes) {
            items.add(realm.copyFromRealm(checkbox));
        }
    } finally {
        if(realm != null) {
            realm.close();
        }
    }
    return Observable.just(items);

或者只是重新考虑您完全这样做的方式,因为最佳解决方案是使用 UI 线程中的 findAllAsync(),并且如果您可以在一个结果列表中获得两个列表.

EDIT2:值得注意的是 asObservable() 的预期用途是替换 addChangeListener(),但您只能 "observe changes" 循环线程上的领域(通常是 UI线程)。

所以asObservable()的预期用途如下

private Subscription readFromEditText() {
    return RxTextView.textChanges(editText).switchMap(charSequence -> {
        String selectedName = charSequence.toString();
        RealmQuery<Dog> query = realm.where(Dog.class);
        if(selectedName != null && !"".equals(selectedName)) {
            query = query.contains(DogFields.NAME, selectedName, Case.INSENSITIVE);
        }
        return query.findAllSortedAsync(DogFields.NAME)
                    .asObservable();
    }).filter(RealmResults::isLoaded) //filter async realm query
      .subscribe(dogs -> adapter.updateData(dogs));
}