是否有可能实现像 delay 这样的运算符,但同时也会延迟错误?

Is it possible to implement an operator like delay but that also delays errors?

我现在尝试了一段时间来实现一个扩展功能(只是因为它对我来说更容易),它能够延迟正常的项目排放和错误。现有延迟操作员仅延迟正常项目排放,错误尽快交付。

对于上下文,我试图模仿 Android LiveData 的行为(有点)。 LiveDatas 是一种具有生命周期意识的可观察模式实现。他们的观察者只有在他们处于可以处理该排放的状态时才会收到通知。如果它们还没有准备好,发射将缓存在实时数据中,并在它们准备好后立即传送。

我创建了一个 BehaviourSubject,它在我的 Activity 和 Fragments 发生变化时发出状态。我创建了一个这样的延迟运算符:

fun <T> Flowable<T>.delayUntilActive(): Flowable<T> = delay { lifecycleSubject.toFlowable(BackpressureStrategy.LATEST).filter { it.isActive } }

然后像这样使用它

myUseCase.getFlowable(Unit)
                .map { it.map { it.toDisplayModel() } }
                .delayUntilActive()
                .subscribe({
                    view.displaySomethings(
                }, { }).addTo(disposables)

因此即使 myUseCase 在视图未准备好显示某些内容时发出,发射也不会到达 onNext() 直到视图准备好。问题是我还希望在触发 onError 时将视图显示为 displayError() ,但这也是生命周期敏感的。如果视图未就绪,应用程序将崩溃。

所以我正在寻找一种方法来延迟排放和错误(onComplete 也很好)。这可能吗? 我用 zip、onErrorReturn、delay inside delay 尝试了一些东西,但似乎都不对。如果这有一个我忽略或不可能的非常简单的解决方案,我同样不会留下深刻印象。欢迎任何想法。

奖励:对于 Single 和 Completable 有更好的方法吗?目前我只是将它们转换为可流动的。

提前致谢!

您可以通过 onErrorResumeNext 处理错误,然后通过 delaySubscription 处理相同的错误并延迟它,直到发生您想要的信号以发出所述错误:

source
.onErrorResumeNext({ error -> 
     Observable.error(error)
     .delaySubscription(lifecycleSubject.filter { it.Active } ) 
})