Rxjava while循环获取数据库对象并上传到服务器

Rxjava while loop to get database object and upload to server

我对如何在 RxJava 中实现它感到困惑。 我想

  1. 从我的数据库中取出一个对象
  2. 上传
  3. 从数据库中删除它
  4. 从数据库中取出下一项并重复 2 和 3
  5. 当数据库没有剩余对象时完成

我知道如何通过首先从数据库加载所有对象并创建一个 Observable 像这样 Observable.fromIterable(allMyDbObjects) 来做到这一点,但是我想一次只获取一个对象,以防万一数据库在我上传时更新。 我不知道该怎么做。我查看了 repeatUntil,但它似乎立即重复。这是我在想什么的伪代码:

getFirstDbObject()
    .flatMapCompletable { obj ->
        upload(obj)
            .doOnComplete {
                 deleteFromDb(obj)
            }
    }
    .repeatUntil {
        // dbIsEmptyLogic. 
        // This doesn't work. I need to somehow call getFirstDbObject() again
    }

有人能帮忙吗?

这是我的代码库中的一个有效解决方案。

    val source = HashSet<String>()
    source.add("a")
    source.add("b")
    source.add("c")
    source.add("d")
    source.add("e")
    io.reactivex.Observable.just(Unit)
        .flatMap { it ->
            io.reactivex.Observable.fromCallable {
                println("first flatmap print $it")
                // uploadObj()
                source.first()
            }
        }.flatMap {
            // delete
            io.reactivex.Observable.fromCallable {
                source.remove(it)
                println("second flatmap remove $it")
                // delete object
            }
        }
        .repeatUntil { source.isEmpty() }
        .subscribe()

假设getFirstDbObject() returns一个Maybe,你可以通过将结果映射到布尔值来实现(true如果数据库为空,false如果不是)然后重复序列直到 getFirstDbObject() returns 为空并且流完成。

getFirstDbObject()
    .toObservable()
    .flatMapSingle { obj ->
        upload(obj)
            .doOnComplete { deleteFromDb(obj) } // probably better to use .andThen(deleteFromDb(obj)) if delete also returns a completable
            .toSingleDefault(false)
    }
    .defaultIfEmpty(true)
    .repeat()
    .takeUntil { isComplete ->
        isComplete
    }