RxJava Observable 从不执行 onCompleted

RxJava Observable never perform onCompleted

我正在使用 RxJavaRealm 来查询移动数据库。我需要通知我的视图在查询完成后更新列表,但似乎 flatMapdoOnNext 有效,但它永远不会继续完成。我需要一个触发器才能知道它结束了。

Realm.getDefaultInstance().asObservable().flatMap((realm) ->
        realm.where(FileDoc.class).isNull("parent").isNotNull("updatedDate").findAllSortedAsync("name").asObservable()
                .flatMap((files) -> {
                    documents.get("root").clear();
                    documents.get("root").addAll(realm.copyFromRealm(files));
                    return realm.where(Folder.class).isNull("parent").findAllSortedAsync("name").asObservable().doOnCompleted(() -> {
                        Log.e(TAG, "Barr!");
                    });
                })
                .doOnNext((folders) -> {
                    documents.get("root").addAll(realm.copyFromRealm(folders));
                })).doOnCompleted(() -> Log.e(TAG, "Fooo!"))
        .doOnError(error -> handleErrorEvent(error))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(AndroidSchedulers.from(backgroundLooper))
        .subscribe();

这里 Barr!Fooo! 从未打印过...有人知道吗?


已更新

Realm.getDefaultInstance().asObservable().first().flatMap((realm) ->
                realm.where(IncidentTemplate.class).equalTo("deleted", false).findAllAsync().asObservable()
                        .filter(results -> results.isLoaded())
                        .first()
                        .doOnNext((files) -> {
                            if (!incidents.containsKey("INCIDENT")) {
                                incidents.put("INCIDENT", new ArrayList<>());
                            }

                            incidents.get("INCIDENT").clear();
                            incidents.get("INCIDENT").addAll(realm.copyFromRealm(files));

                        })
                        .doOnTerminate(() -> {
                            Log.e(TAG, "Closing realm");
                            realm.close();
                        }))
                .doOnCompleted(() -> {
                    emitStoreChange(new CobaltStore.CobaltStoreChangeEvent());
                    Log.e(TAG, "EMIT INCIDENT");
                })
                .doOnError(error -> handleErrorEvent(error))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(AndroidSchedulers.from(backgroundLooper))
                .subscribe();

1.) Realm.getDefaultInstance().asObservable()

您获得了一个您永远无法关闭的本地 Realm 实例,因此您正在泄漏内存。

2.) 这不是 asObservable() 设计的工作方式。

OnClickListener 完成并发出终端事件是否有意义?

例如,您单击按钮一次,然后可观察对象 "finishes" 并且永远不会再发出任何事件,即使用户之后单击按钮也是如此?

当然不是,因此您也不应期望从 Realm 的 RealmChangeListener 得到它;这是绑定到 Realm 或 RealmResults 的内容,因此您可以收听任何未来对 Realm 或其特定 table.

的写入

解决方案可能只是使用类似

的东西
Observable.fromCallable(() => {
    try(Realm realm = Realm.getDefaultInstance()) {
        RealmResults<FileDoc> docs = realm.where(FileDoc.class)
             .isNull("parent")
             .isNotNull("updatedDate")
             .findAllSorted("name");
        documents.get("root").clear(); // <-- shouldn't you modify a new copy and return this?
        List<FileDoc> unmanagedDocs = realm.copyFromRealm(files);
        documents.get("root").addAll(unmanagedDocs);
        return unmanagedDocs;
    }
}

或者正如您指出的那样,.filter((data) -> data.isLoaded()).first() 也可以,尽管我没有想到这一点。