RxJava2 onNext() 多次调用?

RxJava2 onNext() called multiple times?

我有一个方法 returns 像这样的 Observable:

open fun get(): Observable<Response> {

    return if (condition)
        getDataFromApi()
    else
        getDataFromDb()

}

订阅如下:

                get()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(/*a object of class extending DefaultObserver*/)

我在调用 getDataFromDb() 时遇到了这个问题,并在一段时间后根据条件调用了 getDataFromApi() 。对于第一次调用,它工作正常,但在第二次调用 onNext 时,使用来自 getDataFromDb() 的旧数据响应多次调用 onNext。请让我知道我做错了什么。我对 RxJava 有点陌生。

如果 getDataFromDb() 正在发射项目并且调用了 getDataFromApi() ,第一个方法将继续发射直到完成。如果不再需要流,您应该取消订阅该流,并在您的源中添加条件,以便在取消订阅 observable 时停止发射。

另请记住,在订阅流之前,可观察对象中的函数不会运行。