如何在 RxJava2 中连接 EagerDelayError

How to concatEagerDelayError in RxJava2

如何实现 Observable.concatEagerDelayError 或 RxJava2/RxKotlin2 中的等效项?

有:

但不是:

我有什么:

fun getAll(): Observable<List<User>> = Observable.concatArrayDelayError(
    // from db
    userDAO
        .selectAll()
        .subscribeOn(ioScheduler),
    // from api
    userAPI
        .getAll()
        .doOnNext { lstUser -> Completable.concatArray(
            userDAO.deleteAll().subscribeOn(ioScheduler),
            userDAO.save(lstUser).subscribeOn(ioScheduler)
        ) }
        .subscribeOn(ioScheduler)
)

我想要相同的行为,但急切地想要 selectAll() 和 getAll(),因为没有理由等待数据库启动网络调用。

使用concatMapEagerDelayError:

 Observable.fromIterable(sources)
 .concatMapEagerDelayError(v -> v, true);

 Observable.fromArray(source1, source2, source3)
 .concatMapEagerDelayError(v -> v, true);

JavaDoc.

编辑:

fun getAll(): Observable<List<User>> = Observable.fromArray(
    // from db
    userDAO
        .selectAll()
        .subscribeOn(ioScheduler),
    // from api
    userAPI
       .getAll()
       // --- this makes no sense by the way -------------------
       .doOnNext { lstUser -> Completable.concatArray(
            userDAO.deleteAll().subscribeOn(ioScheduler),
            userDAO.save(lstUser).subscribeOn(ioScheduler)
       )}
       // ------------------------------------------------------
       .subscribeOn(ioScheduler)
)
.concatMapEagerDelayError({ v -> v }, true)