使用 countDownLatch.await() 确保交付结果
using countDownLatch.await() to make sure result is delivered
完整的源代码可以在这里找到:https://github.com/alirezaeiii/SavingGoals-Cache
这是 LocalDataSource class :
@Singleton
class QapitalLocalDataSource @Inject constructor(
private val goalsDao: GoalsDao
) : LocalDataSource {
override fun getSavingsGoals(): Single<List<SavingsGoal>> =
Single.create { singleSubscriber ->
goalsDao.getGoals()
.subscribe {
if (it.isEmpty()) {
singleSubscriber.onError(NoDataException())
} else {
singleSubscriber.onSuccess(it)
}
}
}
}
以上方法已在存储库中使用 class :
@Singleton
class GoalsRepository @Inject constructor(
private val remoteDataSource: QapitalService,
private val localDataSource: LocalDataSource,
private val schedulerProvider: BaseSchedulerProvider
) {
private var cacheIsDirty = false
fun getSavingsGoals(): Observable<List<SavingsGoal>> {
lateinit var goals: Observable<List<SavingsGoal>>
if (cacheIsDirty) {
goals = getGoalsFromRemoteDataSource()
} else {
val latch = CountDownLatch(1)
var disposable: Disposable? = null
disposable = localDataSource.getSavingsGoals()
.observeOn(schedulerProvider.io())
.doFinally {
latch.countDown()
disposable?.dispose()
}.subscribe({
goals = Observable.create { emitter -> emitter.onNext(it) }
}, { goals = getGoalsFromRemoteDataSource() })
latch.await()
}
return goals
}
}
如您所见,我正在使用 countDownLatch.await() 来确保结果在订阅或错误块中发出。在使用 RxJava 时有没有比使用 CountDownLatch
更好的解决方案?
latch.await()
阻塞了线程,这有点破坏了使用像 RxJava 这样的异步 API 的全部意义。
RxJava 有 APIs 像 onErrorResumeNext
来处理异常和 toObservable
将 Single
结果转换为 Observable
结果。
此外,像这样的 RxJava 类型通常是冷的(它们不会 运行 或在您订阅之前找出任何东西)所以我建议在订阅发生之前不要检查 cacheIsDirty。
我会选择类似的东西:
fun getSavingsGoals(): Observable<List<SavingsGoal>> {
return Observable
.fromCallable { cacheIsDirty }
.flatMap {
if (it) {
getGoalsFromRemoteDataSource()
} else {
localDataSource.getSavingsGoals()
.toObservable()
.onErrorResumeNext(getGoalsFromRemoteDataSource())
}
}
}
顺便说一句,如果您已经在使用 Kotlin,我强烈推荐协程。然后您的异步代码最终会像常规顺序代码一样读取。
完整的源代码可以在这里找到:https://github.com/alirezaeiii/SavingGoals-Cache
这是 LocalDataSource class :
@Singleton
class QapitalLocalDataSource @Inject constructor(
private val goalsDao: GoalsDao
) : LocalDataSource {
override fun getSavingsGoals(): Single<List<SavingsGoal>> =
Single.create { singleSubscriber ->
goalsDao.getGoals()
.subscribe {
if (it.isEmpty()) {
singleSubscriber.onError(NoDataException())
} else {
singleSubscriber.onSuccess(it)
}
}
}
}
以上方法已在存储库中使用 class :
@Singleton
class GoalsRepository @Inject constructor(
private val remoteDataSource: QapitalService,
private val localDataSource: LocalDataSource,
private val schedulerProvider: BaseSchedulerProvider
) {
private var cacheIsDirty = false
fun getSavingsGoals(): Observable<List<SavingsGoal>> {
lateinit var goals: Observable<List<SavingsGoal>>
if (cacheIsDirty) {
goals = getGoalsFromRemoteDataSource()
} else {
val latch = CountDownLatch(1)
var disposable: Disposable? = null
disposable = localDataSource.getSavingsGoals()
.observeOn(schedulerProvider.io())
.doFinally {
latch.countDown()
disposable?.dispose()
}.subscribe({
goals = Observable.create { emitter -> emitter.onNext(it) }
}, { goals = getGoalsFromRemoteDataSource() })
latch.await()
}
return goals
}
}
如您所见,我正在使用 countDownLatch.await() 来确保结果在订阅或错误块中发出。在使用 RxJava 时有没有比使用 CountDownLatch
更好的解决方案?
latch.await()
阻塞了线程,这有点破坏了使用像 RxJava 这样的异步 API 的全部意义。
RxJava 有 APIs 像 onErrorResumeNext
来处理异常和 toObservable
将 Single
结果转换为 Observable
结果。
此外,像这样的 RxJava 类型通常是冷的(它们不会 运行 或在您订阅之前找出任何东西)所以我建议在订阅发生之前不要检查 cacheIsDirty。
我会选择类似的东西:
fun getSavingsGoals(): Observable<List<SavingsGoal>> {
return Observable
.fromCallable { cacheIsDirty }
.flatMap {
if (it) {
getGoalsFromRemoteDataSource()
} else {
localDataSource.getSavingsGoals()
.toObservable()
.onErrorResumeNext(getGoalsFromRemoteDataSource())
}
}
}
顺便说一句,如果您已经在使用 Kotlin,我强烈推荐协程。然后您的异步代码最终会像常规顺序代码一样读取。