RxJava 棘手的 startWith(Observable)
RxJava tricky startWith(Observable)
以下代码仅在 observable2 完成后才从 observable1 发出项目。
observable1.startWith(observable2)
.subscribe()
我需要实现另一种行为
observable1 -> 0 1 2 3
observable2 -> 1 2 3 4 5 6
observable1.startWithDefault(observable2)
-> 1 2 0 1 2 3
第二个 observable 仅在第一个 observable 为空时发射项目,然后发射第一个 observable 的项目。
我无法仅使用基本运算符找到正确的解决方案,自定义运算符 startWithDefault 的正确 RxJava 2 实现应该是什么样子?
P.S.
observable1.subscribe()
observable2.takeUntil(observable1).subscribe()
不是正确的解决方案,因为在立即从 observable1 发出的情况下存在竞争
方向很好,但你需要publish(Function)
分享observable1
的信号,concatEager
在开关出现时不丢失元素:
observable1.publish(o ->
Observable.concatEager(observable2.takeUntil(o), o)
)
.subscribe(...)
以下代码仅在 observable2 完成后才从 observable1 发出项目。
observable1.startWith(observable2)
.subscribe()
我需要实现另一种行为
observable1 -> 0 1 2 3
observable2 -> 1 2 3 4 5 6
observable1.startWithDefault(observable2)
-> 1 2 0 1 2 3
第二个 observable 仅在第一个 observable 为空时发射项目,然后发射第一个 observable 的项目。
我无法仅使用基本运算符找到正确的解决方案,自定义运算符 startWithDefault 的正确 RxJava 2 实现应该是什么样子?
P.S.
observable1.subscribe()
observable2.takeUntil(observable1).subscribe()
不是正确的解决方案,因为在立即从 observable1 发出的情况下存在竞争
方向很好,但你需要publish(Function)
分享observable1
的信号,concatEager
在开关出现时不丢失元素:
observable1.publish(o ->
Observable.concatEager(observable2.takeUntil(o), o)
)
.subscribe(...)