为什么我的订阅永远无法完成?

Why does my subscribe never complete?

我正在尝试在 android 的上下文中学习 rxJava 和反应式编程,我觉得我快到了,我只是无法完全掌握完整的画面以完全理解我在做什么。

我有下面的代码,它从数据库

中获取一个名为 iApps 的 class 的实例列表
 myHelper m = new myHelper(getApplication());
        m.getApps()
                .observeOn(Schedulers.newThread())
                .subscribe(currentApps::addAll,
                        throwable -> Log.e("Error Observable", throwable.toString() + " " + Arrays.toString(throwable.getStackTrace())),
                        () -> compareLists(availableApps, currentApps));
}

其中使用了以下方法: //来自我的数据库调用函数

public  Callable<ArrayList<iApp>> getApps()
    {
        return this::getCurrentInfo;
    }

自定义辅助函数

public class myHelper {

    Context ctx;
    tQuery t;
    public myHelper(Context _ctx)
    {
        this.ctx = _ctx;
        t = new tQuery(_ctx);
    }

    Observable<ArrayList<iApp>> getApps()
    {
        return makeObservable(t.getApps())
                .subscribeOn(Schedulers.computation());
    }

    private static <T> Observable<T> makeObservable(final Callable<T> func) {
        return Observable.create(
                new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        try {
                            subscriber.onNext(func.call());

                        } catch (Exception ex) {
                            subscriber.onError(ex);
                        }
                    }
                });
    }

}

但是我从来没有 运行s。我通过遍历 iApp 的结果并输出其中一个字段来检查 onNext,这样我就可以看到正在收集数据,但是我的 compareLists 函数永远不会 运行.

有人可以解释我的疏忽吗?

好尴尬!

private static <T> Observable<T> makeObservable(final Callable<T> func) {
        return Observable.create(
                new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        try {
                            subscriber.onNext(func.call());
                            subscriber.onCompleted();

                        } catch (Exception ex) {
                            subscriber.onError(ex);
                        }
                    }
                });
    }