RxJava Observable.create 包装可观察订阅

RxJava Observable.create wrapping observable subscriptions

我使用了 Observable.create 这样我就可以在某些数据可用时通知订阅者。我有点不确定在我的 create 方法中订阅 observables。这些嵌套订阅会给我带来什么问题吗?我并不完全熟悉使用 Observable.create 创建可观察对象,所以我想确保我没有做任何不寻常的事情或滥用它。提前致谢!

abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) {

    abstract fun fetchFromApi(): Single<ApiType>
    abstract fun fetchFromDb(): Observable<Optional<DbType>>
    abstract fun saveToDb(apiType: ApiType?)
    abstract fun shouldFetchFromApi(cache: DbType?): Boolean

    fun fetch(): Observable<Optional<DbType>>  {
        return Observable.create<Optional<DbType>> {
            val subscriber = it

            fetchFromDb()
                    .subscribe({
                        subscriber.onNext(it)

                        if(shouldFetchFromApi(it.get())) {
                            fetchFromApi()
                                    .observeOn(schedulerProvider.io())
                                    .map {
                                        saveToDb(it)
                                        it
                                    }
                                    .observeOn(schedulerProvider.ui())
                                    .flatMapObservable {
                                        fetchFromDb()
                                    }
                                    .subscribe({
                                        subscriber.onNext(it)
                                        subscriber.onComplete()
                                    })
                        }
                        else {
                            subscriber.onComplete()
                        }
                    })

        }
    }
}

是的,这会导致问题。

首先,像这样嵌套 Observable 不是惯用的,反应式方法的优势之一是组合 Observables,因此只有一个干净的流。通过这种方式,你打破了链条,直接的结果是交织在一起的代码更难阅读,更多的代码来连接通知事件,基本上就像用 Observable.[= 包装异步回调方法一样30=] 在这里,因为你已经有了反应式组件,你可以简单地组合它们,而不是用回调方法处理它们。

其次,作为断链的结果,最严重和最直接的一个 - 取消订阅外部 Observable 不会自动影响内部 Observable。尝试添加 subscribeOn() 也是如此,在背压很重要的不同情况下,它也适用。

作曲的替代方案可能是这样的:

fun fetch2(): Observable<Optional<DbType>> {
        return fetchFromDb()
                .flatMap {
                    if (shouldFetchFromApi(it.get())) {
                        fetchFromApi()
                                .observeOn(schedulerProvider.io())
                                .doOnSuccess { saveToDb(it) }
                                .observeOn(schedulerProvider.ui())
                                .flatMapObservable {
                                    fetchFromDb()
                                }

                    } else {
                        Observable.empty()
                    }
                }
    }

如果出于某种原因,无论如何您希望单独发出第一个 fetchFromDb() 结果,您也可以使用 publish() 和选择器来实现:

 fun fetch2(): Observable<Optional<DbType>> {
    return fetchFromDb()
            .publish {
                Observable.merge(it,
                        it.flatMap {
                            if (shouldFetchFromApi(it.get())) {
                                fetchFromApi()
                                        .observeOn(schedulerProvider.io())
                                        .doOnSuccess { saveToDb(it) }
                                        .observeOn(schedulerProvider.ui())
                                        .flatMapObservable {
                                            fetchFromDb()
                                        }

                            } else {
                                Observable.empty()
                            }
                        })
            }

}