RxJava 棘手的 startWith(Observable)

RxJava tricky startWith(Observable)

以下代码仅在 observable2 完成后才从 observable1 发出项目。

observable1.startWith(observable2)
           .subscribe()

我需要实现另一种行为

observable1 ->       0   1   2   3
observable2 -> 1   2   3   4   5   6

observable1.startWithDefault(observable2)
            -> 1   2 0   1   2   3

第二个 observable 仅在第一个 observable 为空时发射项目,然后发射第一个 observable 的项目。

我无法仅使用基本运算符找到正确的解决方案,自定义运算符 startWithDefault 的正确 RxJava 2 实现应该是什么样子?

P.S.

observable1.subscribe()
observable2.takeUntil(observable1).subscribe()

不是正确的解决方案,因为在立即从 observable1 发出的情况下存在竞争

方向很好,但你需要publish(Function)分享observable1的信号,concatEager在开关出现时不丢失元素:

observable1.publish(o -> 
    Observable.concatEager(observable2.takeUntil(o), o)
)
.subscribe(...)