来自 coroutines-rx2 lib 的 rxObservable 不适用于 Observables.combineLatest

rxObservable from coroutines-rx2 lib doesn't work with Observables.combineLatest

我正在尝试逐渐将 android 代码库从 RxJava2 转换为 Kotlin 协程。我们正在使用 UseCasesRepositories。我已将返回 Observable 的存储库方法之一转换为 suspend 函数。

现在,有一个 UseCase 使用 Observables.combineLatest 组合了 2 个存储库 Observables,其中一个是我转换为 suspend 的那个。

为了仍然按原样使用 UseCase 函数,我使用 kotlinx-coroutines-rx2suspend 函数转换为 observable,它提供了 rxjava 和协程之间的互操作.我专门使用 this 方法。

代码如下所示:

override fun execute(): Observable<GetFollowersResult> {
    return Observables.combineLatest(
        // This suspend function is not getting called
        rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() },
        profileRepository.getProfile().toObservable()
    ) { followers, profile ->
        // mapping code
    }.subscribeOn(threadExecutor)
        .map<GetFollowersResult> { page ->
            // result
        }
        .onErrorReturn { throwable ->
            // error
        }
        .observeOn(postExecutionThread)
        .startWith(GetFollowersResult.InFlight)
}

但是即使订阅了 combineLatest 返回的 observable,rxObservable 中的挂起函数也不会被调用。

我错过了什么吗?我无法将另一种方法转换为 suspend,因为该方法在很多地方使用,我仍然想保留 suspend 函数,因为我们需要在较新的 UseCases 中使用它。

我建议你更换:

rxObservable<ProfilesPageDomainModel>(Dispatchers.IO) { profileRepository.getFollowers() }

与:

rxSingle(Dispatchers.IO) { profileRepository.getFollowers() }.toObservable()

因为 suspend 函数的 Rx 等价物是 Single(当它 returns 一个值时)或 Completable(当它 returns Unit).

如果您查看 rxSinglerxObservable 的 Javadoc,您会发现细微差别:

  • rxObservablereturnsUnit中的block函数,意思是里面需要用send来发射物品
  • rxSinglereturnsT中的block函数,意思是returns里面调用的suspend函数的值