来自 coroutines-rx2 lib 的 rxObservable 不适用于 Observables.combineLatest
rxObservable from coroutines-rx2 lib doesn't work with Observables.combineLatest
我正在尝试逐渐将 android 代码库从 RxJava2 转换为 Kotlin 协程。我们正在使用 UseCases
和 Repositories
。我已将返回 Observable
的存储库方法之一转换为 suspend
函数。
现在,有一个 UseCase
使用 Observables.combineLatest
组合了 2 个存储库 Observables
,其中一个是我转换为 suspend
的那个。
为了仍然按原样使用 UseCase
函数,我使用 kotlinx-coroutines-rx2
将 suspend
函数转换为 observable
,它提供了 rxjava 和协程之间的互操作.我专门使用 this 方法。
代码如下所示:
override fun execute(): Observable<GetFollowersResult> {
return Observables.combineLatest(
// This suspend function is not getting called
rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() },
profileRepository.getProfile().toObservable()
) { followers, profile ->
// mapping code
}.subscribeOn(threadExecutor)
.map<GetFollowersResult> { page ->
// result
}
.onErrorReturn { throwable ->
// error
}
.observeOn(postExecutionThread)
.startWith(GetFollowersResult.InFlight)
}
但是即使订阅了 combineLatest
返回的 observable,rxObservable
中的挂起函数也不会被调用。
我错过了什么吗?我无法将另一种方法转换为 suspend
,因为该方法在很多地方使用,我仍然想保留 suspend
函数,因为我们需要在较新的 UseCases
中使用它。
我建议你更换:
rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() }
与:
rxSingle(Dispatchers.IO) { profileRepository.getFollowers() }.toObservable()
因为 suspend
函数的 Rx 等价物是 Single
(当它 returns 一个值时)或 Completable
(当它 returns Unit
).
如果您查看 rxSingle
和 rxObservable
的 Javadoc,您会发现细微差别:
rxObservable
returnsUnit
中的block
函数,意思是里面需要用send
来发射物品
rxSingle
returnsT
中的block
函数,意思是returns里面调用的suspend函数的值
我正在尝试逐渐将 android 代码库从 RxJava2 转换为 Kotlin 协程。我们正在使用 UseCases
和 Repositories
。我已将返回 Observable
的存储库方法之一转换为 suspend
函数。
现在,有一个 UseCase
使用 Observables.combineLatest
组合了 2 个存储库 Observables
,其中一个是我转换为 suspend
的那个。
为了仍然按原样使用 UseCase
函数,我使用 kotlinx-coroutines-rx2
将 suspend
函数转换为 observable
,它提供了 rxjava 和协程之间的互操作.我专门使用 this 方法。
代码如下所示:
override fun execute(): Observable<GetFollowersResult> {
return Observables.combineLatest(
// This suspend function is not getting called
rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() },
profileRepository.getProfile().toObservable()
) { followers, profile ->
// mapping code
}.subscribeOn(threadExecutor)
.map<GetFollowersResult> { page ->
// result
}
.onErrorReturn { throwable ->
// error
}
.observeOn(postExecutionThread)
.startWith(GetFollowersResult.InFlight)
}
但是即使订阅了 combineLatest
返回的 observable,rxObservable
中的挂起函数也不会被调用。
我错过了什么吗?我无法将另一种方法转换为 suspend
,因为该方法在很多地方使用,我仍然想保留 suspend
函数,因为我们需要在较新的 UseCases
中使用它。
我建议你更换:
rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() }
与:
rxSingle(Dispatchers.IO) { profileRepository.getFollowers() }.toObservable()
因为 suspend
函数的 Rx 等价物是 Single
(当它 returns 一个值时)或 Completable
(当它 returns Unit
).
如果您查看 rxSingle
和 rxObservable
的 Javadoc,您会发现细微差别:
rxObservable
returnsUnit
中的block
函数,意思是里面需要用send
来发射物品rxSingle
returnsT
中的block
函数,意思是returns里面调用的suspend函数的值