使用 countDownLatch.await() 确保交付结果

using countDownLatch.await() to make sure result is delivered

完整的源代码可以在这里找到:https://github.com/alirezaeiii/SavingGoals-Cache

这是 LocalDataSource class :

@Singleton
class QapitalLocalDataSource @Inject constructor(
    private val goalsDao: GoalsDao
) : LocalDataSource {

    override fun getSavingsGoals(): Single<List<SavingsGoal>> =
        Single.create { singleSubscriber ->
            goalsDao.getGoals()
                .subscribe {
                    if (it.isEmpty()) {
                        singleSubscriber.onError(NoDataException())
                    } else {
                        singleSubscriber.onSuccess(it)
                    }
                }
        }
}

以上方法已在存储库中使用 class :

@Singleton
class GoalsRepository @Inject constructor(
    private val remoteDataSource: QapitalService,
    private val localDataSource: LocalDataSource,
    private val schedulerProvider: BaseSchedulerProvider
) {

    private var cacheIsDirty = false

    fun getSavingsGoals(): Observable<List<SavingsGoal>> {
        lateinit var goals: Observable<List<SavingsGoal>>
        if (cacheIsDirty) {
            goals = getGoalsFromRemoteDataSource()
        } else {
            val latch = CountDownLatch(1)
            var disposable: Disposable? = null
            disposable = localDataSource.getSavingsGoals()
                .observeOn(schedulerProvider.io())
                .doFinally {
                    latch.countDown()
                    disposable?.dispose()
                }.subscribe({
                    goals = Observable.create { emitter -> emitter.onNext(it) }
                }, { goals = getGoalsFromRemoteDataSource() })
            latch.await()
        }
        return goals
    }
}

如您所见,我正在使用 countDownLatch.await() 来确保结果在订阅或错误块中发出。在使用 RxJava 时有没有比使用 CountDownLatch 更好的解决方案?

latch.await() 阻塞了线程,这有点破坏了使用像 RxJava 这样的异步 API 的全部意义。

RxJava 有 APIs 像 onErrorResumeNext 来处理异常和 toObservableSingle 结果转换为 Observable 结果。

此外,像这样的 RxJava 类型通常是冷的(它们不会 运行 或在您订阅之前找出任何东西)所以我建议在订阅发生之前不要检查 cacheIsDirty。

我会选择类似的东西:

    fun getSavingsGoals(): Observable<List<SavingsGoal>> {
        return Observable
            .fromCallable { cacheIsDirty }
            .flatMap {
                if (it) {
                    getGoalsFromRemoteDataSource()
                } else {
                    localDataSource.getSavingsGoals()
                        .toObservable()
                        .onErrorResumeNext(getGoalsFromRemoteDataSource())
                }
            }
    }

顺便说一句,如果您已经在使用 Kotlin,我强烈推荐协程。然后您的异步代码最终会像常规顺序代码一样读取。