RxJava Observable 从不执行 onCompleted
RxJava Observable never perform onCompleted
我正在使用 RxJava
和 Realm
来查询移动数据库。我需要通知我的视图在查询完成后更新列表,但似乎 flatMap
和 doOnNext
有效,但它永远不会继续完成。我需要一个触发器才能知道它结束了。
Realm.getDefaultInstance().asObservable().flatMap((realm) ->
realm.where(FileDoc.class).isNull("parent").isNotNull("updatedDate").findAllSortedAsync("name").asObservable()
.flatMap((files) -> {
documents.get("root").clear();
documents.get("root").addAll(realm.copyFromRealm(files));
return realm.where(Folder.class).isNull("parent").findAllSortedAsync("name").asObservable().doOnCompleted(() -> {
Log.e(TAG, "Barr!");
});
})
.doOnNext((folders) -> {
documents.get("root").addAll(realm.copyFromRealm(folders));
})).doOnCompleted(() -> Log.e(TAG, "Fooo!"))
.doOnError(error -> handleErrorEvent(error))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
这里 Barr!
和 Fooo!
从未打印过...有人知道吗?
已更新
Realm.getDefaultInstance().asObservable().first().flatMap((realm) ->
realm.where(IncidentTemplate.class).equalTo("deleted", false).findAllAsync().asObservable()
.filter(results -> results.isLoaded())
.first()
.doOnNext((files) -> {
if (!incidents.containsKey("INCIDENT")) {
incidents.put("INCIDENT", new ArrayList<>());
}
incidents.get("INCIDENT").clear();
incidents.get("INCIDENT").addAll(realm.copyFromRealm(files));
})
.doOnTerminate(() -> {
Log.e(TAG, "Closing realm");
realm.close();
}))
.doOnCompleted(() -> {
emitStoreChange(new CobaltStore.CobaltStoreChangeEvent());
Log.e(TAG, "EMIT INCIDENT");
})
.doOnError(error -> handleErrorEvent(error))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
1.) Realm.getDefaultInstance().asObservable()
您获得了一个您永远无法关闭的本地 Realm 实例,因此您正在泄漏内存。
2.) 这不是 asObservable()
设计的工作方式。
OnClickListener
完成并发出终端事件是否有意义?
例如,您单击按钮一次,然后可观察对象 "finishes" 并且永远不会再发出任何事件,即使用户之后单击按钮也是如此?
当然不是,因此您也不应期望从 Realm 的 RealmChangeListener
得到它;这是绑定到 Realm 或 RealmResults 的内容,因此您可以收听任何未来对 Realm 或其特定 table.
的写入
解决方案可能只是使用类似
的东西
Observable.fromCallable(() => {
try(Realm realm = Realm.getDefaultInstance()) {
RealmResults<FileDoc> docs = realm.where(FileDoc.class)
.isNull("parent")
.isNotNull("updatedDate")
.findAllSorted("name");
documents.get("root").clear(); // <-- shouldn't you modify a new copy and return this?
List<FileDoc> unmanagedDocs = realm.copyFromRealm(files);
documents.get("root").addAll(unmanagedDocs);
return unmanagedDocs;
}
}
或者正如您指出的那样,.filter((data) -> data.isLoaded()).first()
也可以,尽管我没有想到这一点。
我正在使用 RxJava
和 Realm
来查询移动数据库。我需要通知我的视图在查询完成后更新列表,但似乎 flatMap
和 doOnNext
有效,但它永远不会继续完成。我需要一个触发器才能知道它结束了。
Realm.getDefaultInstance().asObservable().flatMap((realm) ->
realm.where(FileDoc.class).isNull("parent").isNotNull("updatedDate").findAllSortedAsync("name").asObservable()
.flatMap((files) -> {
documents.get("root").clear();
documents.get("root").addAll(realm.copyFromRealm(files));
return realm.where(Folder.class).isNull("parent").findAllSortedAsync("name").asObservable().doOnCompleted(() -> {
Log.e(TAG, "Barr!");
});
})
.doOnNext((folders) -> {
documents.get("root").addAll(realm.copyFromRealm(folders));
})).doOnCompleted(() -> Log.e(TAG, "Fooo!"))
.doOnError(error -> handleErrorEvent(error))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
这里 Barr!
和 Fooo!
从未打印过...有人知道吗?
已更新
Realm.getDefaultInstance().asObservable().first().flatMap((realm) ->
realm.where(IncidentTemplate.class).equalTo("deleted", false).findAllAsync().asObservable()
.filter(results -> results.isLoaded())
.first()
.doOnNext((files) -> {
if (!incidents.containsKey("INCIDENT")) {
incidents.put("INCIDENT", new ArrayList<>());
}
incidents.get("INCIDENT").clear();
incidents.get("INCIDENT").addAll(realm.copyFromRealm(files));
})
.doOnTerminate(() -> {
Log.e(TAG, "Closing realm");
realm.close();
}))
.doOnCompleted(() -> {
emitStoreChange(new CobaltStore.CobaltStoreChangeEvent());
Log.e(TAG, "EMIT INCIDENT");
})
.doOnError(error -> handleErrorEvent(error))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
1.) Realm.getDefaultInstance().asObservable()
您获得了一个您永远无法关闭的本地 Realm 实例,因此您正在泄漏内存。
2.) 这不是 asObservable()
设计的工作方式。
OnClickListener
完成并发出终端事件是否有意义?
例如,您单击按钮一次,然后可观察对象 "finishes" 并且永远不会再发出任何事件,即使用户之后单击按钮也是如此?
当然不是,因此您也不应期望从 Realm 的 RealmChangeListener
得到它;这是绑定到 Realm 或 RealmResults 的内容,因此您可以收听任何未来对 Realm 或其特定 table.
解决方案可能只是使用类似
的东西Observable.fromCallable(() => {
try(Realm realm = Realm.getDefaultInstance()) {
RealmResults<FileDoc> docs = realm.where(FileDoc.class)
.isNull("parent")
.isNotNull("updatedDate")
.findAllSorted("name");
documents.get("root").clear(); // <-- shouldn't you modify a new copy and return this?
List<FileDoc> unmanagedDocs = realm.copyFromRealm(files);
documents.get("root").addAll(unmanagedDocs);
return unmanagedDocs;
}
}
或者正如您指出的那样,.filter((data) -> data.isLoaded()).first()
也可以,尽管我没有想到这一点。