RxJava Observable.create 包装可观察订阅
RxJava Observable.create wrapping observable subscriptions
我使用了 Observable.create 这样我就可以在某些数据可用时通知订阅者。我有点不确定在我的 create 方法中订阅 observables。这些嵌套订阅会给我带来什么问题吗?我并不完全熟悉使用 Observable.create 创建可观察对象,所以我想确保我没有做任何不寻常的事情或滥用它。提前致谢!
abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) {
abstract fun fetchFromApi(): Single<ApiType>
abstract fun fetchFromDb(): Observable<Optional<DbType>>
abstract fun saveToDb(apiType: ApiType?)
abstract fun shouldFetchFromApi(cache: DbType?): Boolean
fun fetch(): Observable<Optional<DbType>> {
return Observable.create<Optional<DbType>> {
val subscriber = it
fetchFromDb()
.subscribe({
subscriber.onNext(it)
if(shouldFetchFromApi(it.get())) {
fetchFromApi()
.observeOn(schedulerProvider.io())
.map {
saveToDb(it)
it
}
.observeOn(schedulerProvider.ui())
.flatMapObservable {
fetchFromDb()
}
.subscribe({
subscriber.onNext(it)
subscriber.onComplete()
})
}
else {
subscriber.onComplete()
}
})
}
}
}
是的,这会导致问题。
首先,像这样嵌套 Observable
不是惯用的,反应式方法的优势之一是组合 Observables
,因此只有一个干净的流。通过这种方式,你打破了链条,直接的结果是交织在一起的代码更难阅读,更多的代码来连接通知事件,基本上就像用 Observable
.[= 包装异步回调方法一样30=]
在这里,因为你已经有了反应式组件,你可以简单地组合它们,而不是用回调方法处理它们。
其次,作为断链的结果,最严重和最直接的一个 - 取消订阅外部 Observable
不会自动影响内部 Observable
。尝试添加 subscribeOn()
也是如此,在背压很重要的不同情况下,它也适用。
作曲的替代方案可能是这样的:
fun fetch2(): Observable<Optional<DbType>> {
return fetchFromDb()
.flatMap {
if (shouldFetchFromApi(it.get())) {
fetchFromApi()
.observeOn(schedulerProvider.io())
.doOnSuccess { saveToDb(it) }
.observeOn(schedulerProvider.ui())
.flatMapObservable {
fetchFromDb()
}
} else {
Observable.empty()
}
}
}
如果出于某种原因,无论如何您希望单独发出第一个 fetchFromDb()
结果,您也可以使用 publish()
和选择器来实现:
fun fetch2(): Observable<Optional<DbType>> {
return fetchFromDb()
.publish {
Observable.merge(it,
it.flatMap {
if (shouldFetchFromApi(it.get())) {
fetchFromApi()
.observeOn(schedulerProvider.io())
.doOnSuccess { saveToDb(it) }
.observeOn(schedulerProvider.ui())
.flatMapObservable {
fetchFromDb()
}
} else {
Observable.empty()
}
})
}
}
我使用了 Observable.create 这样我就可以在某些数据可用时通知订阅者。我有点不确定在我的 create 方法中订阅 observables。这些嵌套订阅会给我带来什么问题吗?我并不完全熟悉使用 Observable.create 创建可观察对象,所以我想确保我没有做任何不寻常的事情或滥用它。提前致谢!
abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) {
abstract fun fetchFromApi(): Single<ApiType>
abstract fun fetchFromDb(): Observable<Optional<DbType>>
abstract fun saveToDb(apiType: ApiType?)
abstract fun shouldFetchFromApi(cache: DbType?): Boolean
fun fetch(): Observable<Optional<DbType>> {
return Observable.create<Optional<DbType>> {
val subscriber = it
fetchFromDb()
.subscribe({
subscriber.onNext(it)
if(shouldFetchFromApi(it.get())) {
fetchFromApi()
.observeOn(schedulerProvider.io())
.map {
saveToDb(it)
it
}
.observeOn(schedulerProvider.ui())
.flatMapObservable {
fetchFromDb()
}
.subscribe({
subscriber.onNext(it)
subscriber.onComplete()
})
}
else {
subscriber.onComplete()
}
})
}
}
}
是的,这会导致问题。
首先,像这样嵌套 Observable
不是惯用的,反应式方法的优势之一是组合 Observables
,因此只有一个干净的流。通过这种方式,你打破了链条,直接的结果是交织在一起的代码更难阅读,更多的代码来连接通知事件,基本上就像用 Observable
.[= 包装异步回调方法一样30=]
在这里,因为你已经有了反应式组件,你可以简单地组合它们,而不是用回调方法处理它们。
其次,作为断链的结果,最严重和最直接的一个 - 取消订阅外部 Observable
不会自动影响内部 Observable
。尝试添加 subscribeOn()
也是如此,在背压很重要的不同情况下,它也适用。
作曲的替代方案可能是这样的:
fun fetch2(): Observable<Optional<DbType>> {
return fetchFromDb()
.flatMap {
if (shouldFetchFromApi(it.get())) {
fetchFromApi()
.observeOn(schedulerProvider.io())
.doOnSuccess { saveToDb(it) }
.observeOn(schedulerProvider.ui())
.flatMapObservable {
fetchFromDb()
}
} else {
Observable.empty()
}
}
}
如果出于某种原因,无论如何您希望单独发出第一个 fetchFromDb()
结果,您也可以使用 publish()
和选择器来实现:
fun fetch2(): Observable<Optional<DbType>> {
return fetchFromDb()
.publish {
Observable.merge(it,
it.flatMap {
if (shouldFetchFromApi(it.get())) {
fetchFromApi()
.observeOn(schedulerProvider.io())
.doOnSuccess { saveToDb(it) }
.observeOn(schedulerProvider.ui())
.flatMapObservable {
fetchFromDb()
}
} else {
Observable.empty()
}
})
}
}