Rxjava while循环获取数据库对象并上传到服务器
Rxjava while loop to get database object and upload to server
我对如何在 RxJava 中实现它感到困惑。
我想
- 从我的数据库中取出一个对象
- 上传
- 从数据库中删除它
- 从数据库中取出下一项并重复 2 和 3
- 当数据库没有剩余对象时完成
我知道如何通过首先从数据库加载所有对象并创建一个 Observable
像这样 Observable.fromIterable(allMyDbObjects)
来做到这一点,但是我想一次只获取一个对象,以防万一数据库在我上传时更新。
我不知道该怎么做。我查看了 repeatUntil
,但它似乎立即重复。这是我在想什么的伪代码:
getFirstDbObject()
.flatMapCompletable { obj ->
upload(obj)
.doOnComplete {
deleteFromDb(obj)
}
}
.repeatUntil {
// dbIsEmptyLogic.
// This doesn't work. I need to somehow call getFirstDbObject() again
}
有人能帮忙吗?
这是我的代码库中的一个有效解决方案。
val source = HashSet<String>()
source.add("a")
source.add("b")
source.add("c")
source.add("d")
source.add("e")
io.reactivex.Observable.just(Unit)
.flatMap { it ->
io.reactivex.Observable.fromCallable {
println("first flatmap print $it")
// uploadObj()
source.first()
}
}.flatMap {
// delete
io.reactivex.Observable.fromCallable {
source.remove(it)
println("second flatmap remove $it")
// delete object
}
}
.repeatUntil { source.isEmpty() }
.subscribe()
假设getFirstDbObject()
returns一个Maybe,你可以通过将结果映射到布尔值来实现(true
如果数据库为空,false
如果不是)然后重复序列直到 getFirstDbObject()
returns 为空并且流完成。
getFirstDbObject()
.toObservable()
.flatMapSingle { obj ->
upload(obj)
.doOnComplete { deleteFromDb(obj) } // probably better to use .andThen(deleteFromDb(obj)) if delete also returns a completable
.toSingleDefault(false)
}
.defaultIfEmpty(true)
.repeat()
.takeUntil { isComplete ->
isComplete
}
我对如何在 RxJava 中实现它感到困惑。 我想
- 从我的数据库中取出一个对象
- 上传
- 从数据库中删除它
- 从数据库中取出下一项并重复 2 和 3
- 当数据库没有剩余对象时完成
我知道如何通过首先从数据库加载所有对象并创建一个 Observable
像这样 Observable.fromIterable(allMyDbObjects)
来做到这一点,但是我想一次只获取一个对象,以防万一数据库在我上传时更新。
我不知道该怎么做。我查看了 repeatUntil
,但它似乎立即重复。这是我在想什么的伪代码:
getFirstDbObject()
.flatMapCompletable { obj ->
upload(obj)
.doOnComplete {
deleteFromDb(obj)
}
}
.repeatUntil {
// dbIsEmptyLogic.
// This doesn't work. I need to somehow call getFirstDbObject() again
}
有人能帮忙吗?
这是我的代码库中的一个有效解决方案。
val source = HashSet<String>()
source.add("a")
source.add("b")
source.add("c")
source.add("d")
source.add("e")
io.reactivex.Observable.just(Unit)
.flatMap { it ->
io.reactivex.Observable.fromCallable {
println("first flatmap print $it")
// uploadObj()
source.first()
}
}.flatMap {
// delete
io.reactivex.Observable.fromCallable {
source.remove(it)
println("second flatmap remove $it")
// delete object
}
}
.repeatUntil { source.isEmpty() }
.subscribe()
假设getFirstDbObject()
returns一个Maybe,你可以通过将结果映射到布尔值来实现(true
如果数据库为空,false
如果不是)然后重复序列直到 getFirstDbObject()
returns 为空并且流完成。
getFirstDbObject()
.toObservable()
.flatMapSingle { obj ->
upload(obj)
.doOnComplete { deleteFromDb(obj) } // probably better to use .andThen(deleteFromDb(obj)) if delete also returns a completable
.toSingleDefault(false)
}
.defaultIfEmpty(true)
.repeat()
.takeUntil { isComplete ->
isComplete
}