处理 "onNext" 中针对热、共享、可观察对象抛出的错误的正确方法

Proper way to deal with errors thrown in "onNext" for hot, shared, observables

在 RxJS 版本 5 中,以下代码导致进程在两次订阅迭代三次后终止:

var Rx = require("rxjs");

const published$ = Rx.Observable.interval(1000).publish();

published$.subscribe(index => {
    console.log(`One: ${index}`);

    if (index == 3) throw new Error("ded.");
});

published$.forEach(index => {
    console.log(`Two: ${index}`);
});

published$.connect();

但是,我的理解是下一个处理程序中抛出的错误只会取消订阅该特定订阅,而不会导致底层可观察对象终止。我的预期输出是 "One" 订阅将取消订阅,但间隔将继续为 "Two" 订阅产生结果。

这种行为给我带来了问题,我可能对一个底层热可观察对象有多个订阅——但是在任何这些订阅上抛出的一个异常都会导致底层可观察对象完全终止。

当我使用热模块重新加载进行开发时,这尤其烦人,因为任何订阅中的任何编程错误都会导致我不得不刷新整个页面以重新启动可观察序列。

有没有一种方法,无需将我的每个订阅都包装在 try/catch 中,从而在我的下一个处理程序中抛出异常,从而简单地取消订阅该 ONE 订阅,而不终止底层的可观察对象?

------------编辑------------

通过在 "subscribe" 返回的订阅对象上将 syncErrorThrowable 设置为 true,我找到了我正在寻找的行为。似乎在代码库中唯一一次将此设置为 true 是通过 "do" 运算符。

我应该利用这个字段吗?我觉得使用它很脏,但另一方面,我发现 "do" 运算符与 "next" 订阅处理程序具有不同的错误处理语义很奇怪。

这是受此标志影响的主要代码块: https://github.com/ReactiveX/RxJS/blob/master/src%2FSubscriber.ts#L132

如果设置为 false,将调用此方法: https://github.com/ReactiveX/RxJS/blob/master/src%2FSubscriber.ts#L179

而如果将其设置为 true,则会改用此方法: https://github.com/ReactiveX/RxJS/blob/master/src%2FSubscriber.ts#L188

不同之处在于第一个方法会重新抛出异常返回调用堆栈,而第二个方法会将错误转发给后续订阅。

为什么 do 运算符向前传播错误,而 "next" 处理程序将错误向上冒泡?这对我来说似乎很奇怪。

不,不要使用该字段。如果您将其改回 true,您的订阅将开始吞噬错误。

这是我们用来了解订阅是同步通知(与源 Observable 的订阅调用在同一块中)还是异步通知的一些私有状态。如果在同步通知期间从订阅者的消息处理程序之一抛出错误,我们将推迟重新抛出它,直到我们退出 Observable 的订阅回调。[1]

如果您的处理程序抛出您想转发给订阅的 onError 处理程序的错误,目前的指导是将它们移到订阅上方的 do 块中。

不,我不同意这种行为。这里有一些上下文链接:

[1]来源:我写了这段代码。